diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/auth.go b/src/github.com/matrix-org/dendrite/clientapi/auth/auth.go index 9f350b4b0..a661c1f81 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/auth.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/auth.go @@ -24,7 +24,6 @@ import ( "strings" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" - "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/util" ) @@ -40,10 +39,16 @@ var UnknownDeviceID = "unknown-device" // 32 bytes => 256 bits var tokenByteLength = 32 +// DeviceDatabase represents a device database. +type DeviceDatabase interface { + // Lookup the device matching the given access token. + GetDeviceByAccessToken(token string) (*authtypes.Device, error) +} + // VerifyAccessToken verifies that an access token was supplied in the given HTTP request // and returns the device it corresponds to. Returns resErr (an error response which can be // sent to the client) if the token is invalid or there was a problem querying the database. -func VerifyAccessToken(req *http.Request, deviceDB *devices.Database) (device *authtypes.Device, resErr *util.JSONResponse) { +func VerifyAccessToken(req *http.Request, deviceDB DeviceDatabase) (device *authtypes.Device, resErr *util.JSONResponse) { token, err := extractAccessToken(req) if err != nil { resErr = &util.JSONResponse{ diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go index 8f39cad3c..a58c26346 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/storage/devices/storage.go @@ -18,6 +18,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" ) @@ -53,7 +54,7 @@ func (d *Database) GetDeviceByAccessToken(token string) (*authtypes.Device, erro // an error will be returned. // Returns the device on success. func (d *Database) CreateDevice(localpart, deviceID, accessToken string) (dev *authtypes.Device, returnErr error) { - returnErr = runTransaction(d.db, func(txn *sql.Tx) error { + returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error // Revoke existing token for this device if err = d.devices.deleteDevice(txn, deviceID, localpart); err != nil { @@ -74,30 +75,10 @@ func (d *Database) CreateDevice(localpart, deviceID, accessToken string) (dev *a // If the device doesn't exist, it will not return an error // If something went wrong during the deletion, it will return the SQL error func (d *Database) RemoveDevice(deviceID string, localpart string) error { - return runTransaction(d.db, func(txn *sql.Tx) error { + return common.WithTransaction(d.db, func(txn *sql.Tx) error { if err := d.devices.deleteDevice(txn, deviceID, localpart); err != sql.ErrNoRows { return err } return nil }) } - -// TODO: factor out to common -func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { - txn, err := db.Begin() - if err != nil { - return - } - defer func() { - if r := recover(); r != nil { - txn.Rollback() - panic(r) - } else if err != nil { - txn.Rollback() - } else { - err = txn.Commit() - } - }() - err = fn(txn) - return -} diff --git a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go index aea912c25..3b46487a2 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/clientapi/producers/roomserver.go @@ -48,7 +48,7 @@ func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event, sendAs for i, event := range events { ires[i] = api.InputRoomEvent{ Kind: api.KindNew, - Event: event.JSON(), + Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), } @@ -70,7 +70,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat for i, outlier := range outliers { ires[i] = api.InputRoomEvent{ Kind: api.KindOutlier, - Event: outlier.JSON(), + Event: outlier, AuthEventIDs: outlier.AuthEventIDs(), } eventIDs[i] = outlier.EventID() @@ -83,7 +83,7 @@ func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespStat ires[len(outliers)] = api.InputRoomEvent{ Kind: api.KindNew, - Event: event.JSON(), + Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, StateEventIDs: stateEventIDs, diff --git a/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go b/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go index ff949d3ee..a37c76856 100644 --- a/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/create-room-events/main.go @@ -21,12 +21,13 @@ import ( "encoding/json" "flag" "fmt" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" - "golang.org/x/crypto/ed25519" "os" "strings" "time" + + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" + "golang.org/x/crypto/ed25519" ) const usage = `Usage: %s @@ -131,7 +132,7 @@ func writeEvent(event gomatrixserverlib.Event) { if *format == "InputRoomEvent" { var ire api.InputRoomEvent ire.Kind = api.KindNew - ire.Event = event.JSON() + ire.Event = event authEventIDs := []string{} for _, ref := range b.AuthEvents { authEventIDs = append(authEventIDs, ref.EventID) diff --git a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go index 64d4ffc56..c4bea7f3e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/roomserver-integration-tests/main.go @@ -252,9 +252,9 @@ func main() { input := []string{ `{ - "AuthEventIDs": [], - "Kind": 1, - "Event": { + "auth_event_ids": [], + "kind": 1, + "event": { "origin": "matrix.org", "signatures": { "matrix.org": { @@ -274,10 +274,10 @@ func main() { "hashes": {"sha256": "Q05VLC8nztN2tguy+KnHxxhitI95wK9NelnsDaXRqeo"}, "type": "m.room.create"} }`, `{ - "AuthEventIDs": ["$1463671337126266wrSBX:matrix.org"], - "Kind": 2, - "StateEventIDs": ["$1463671337126266wrSBX:matrix.org"], - "Event": { + "auth_event_ids": ["$1463671337126266wrSBX:matrix.org"], + "kind": 2, + "state_event_ids": ["$1463671337126266wrSBX:matrix.org"], + "event": { "origin": "matrix.org", "signatures": { "matrix.org": { @@ -305,7 +305,7 @@ func main() { ]], "hashes": {"sha256": "t9t3sZV1Eu0P9Jyrs7pge6UTa1zuTbRdVxeUHnrQVH0"}, "type": "m.room.member"}, - "HasState": true + "has_state": true }`, } diff --git a/src/github.com/matrix-org/dendrite/common/httpapi.go b/src/github.com/matrix-org/dendrite/common/httpapi.go index 0ab33925f..6298c7b18 100644 --- a/src/github.com/matrix-org/dendrite/common/httpapi.go +++ b/src/github.com/matrix-org/dendrite/common/httpapi.go @@ -1,16 +1,16 @@ package common import ( + "net/http" + "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" - "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" - "net/http" ) // MakeAuthAPI turns a util.JSONRequestHandler function into an http.Handler which checks the access token in the request. -func MakeAuthAPI(metricsName string, deviceDB *devices.Database, f func(*http.Request, *authtypes.Device) util.JSONResponse) http.Handler { +func MakeAuthAPI(metricsName string, deviceDB auth.DeviceDatabase, f func(*http.Request, *authtypes.Device) util.JSONResponse) http.Handler { h := util.NewJSONRequestHandler(func(req *http.Request) util.JSONResponse { device, resErr := auth.VerifyAccessToken(req, deviceDB) if resErr != nil { diff --git a/src/github.com/matrix-org/dendrite/common/sql.go b/src/github.com/matrix-org/dendrite/common/sql.go new file mode 100644 index 000000000..cabbe6662 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/common/sql.go @@ -0,0 +1,41 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "database/sql" +) + +// WithTransaction runs a block of code passing in an SQL transaction +// If the code returns an error or panics then the transactions is rolledback +// Otherwise the transaction is committed. +func WithTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { + txn, err := db.Begin() + if err != nil { + return + } + defer func() { + if r := recover(); r != nil { + txn.Rollback() + panic(r) + } else if err != nil { + txn.Rollback() + } else { + err = txn.Commit() + } + }() + err = fn(txn) + return +} diff --git a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go index 2f98093e4..a10210076 100644 --- a/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/federationsender/storage/storage.go @@ -77,7 +77,7 @@ func (d *Database) UpdateRoom( addHosts []types.JoinedHost, removeHosts []string, ) (joinedHosts []types.JoinedHost, err error) { - err = runTransaction(d.db, func(txn *sql.Tx) error { + err = common.WithTransaction(d.db, func(txn *sql.Tx) error { if err = d.insertRoom(txn, roomID); err != nil { return err } @@ -105,22 +105,3 @@ func (d *Database) UpdateRoom( }) return } - -func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { - txn, err := db.Begin() - if err != nil { - return - } - defer func() { - if r := recover(); r != nil { - txn.Rollback() - panic(r) - } else if err != nil { - txn.Rollback() - } else { - err = txn.Commit() - } - }() - err = fn(txn) - return -} diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/input.go b/src/github.com/matrix-org/dendrite/roomserver/api/input.go index 4d576aefc..558eb28c4 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/input.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/input.go @@ -16,7 +16,9 @@ package api import ( - "encoding/json" + "net/http" + + "github.com/matrix-org/gomatrixserverlib" ) const ( @@ -25,19 +27,14 @@ const ( // These events are state events used to authenticate other events. // They can become part of the contiguous event graph via backfill. KindOutlier = 1 - // KindJoin event start a new contiguous event graph. The event must be a - // m.room.member event joining this server to the room. This must come with - // the state at the event. If the event is contiguous with the existing - // graph for the room then it is treated as a normal new event. - KindJoin = 2 // KindNew event extend the contiguous graph going forwards. // They usually don't need state, but may include state if the // there was a new event that references an event that we don't // have a copy of. - KindNew = 3 + KindNew = 2 // KindBackfill event extend the contiguous graph going backwards. // They always have state. - KindBackfill = 4 + KindBackfill = 3 ) // DoNotSendToOtherServers tells us not to send the event to other matrix @@ -49,77 +46,66 @@ const DoNotSendToOtherServers = "" type InputRoomEvent struct { // Whether this event is new, backfilled or an outlier. // This controls how the event is processed. - Kind int + Kind int `json:"kind"` // The event JSON for the event to add. - Event []byte + Event gomatrixserverlib.Event `json:"event"` // List of state event IDs that authenticate this event. // These are likely derived from the "auth_events" JSON key of the event. // But can be different because the "auth_events" key can be incomplete or wrong. // For example many matrix events forget to reference the m.room.create event even though it is needed for auth. // (since synapse allows this to happen we have to allow it as well.) - AuthEventIDs []string + AuthEventIDs []string `json:"auth_event_ids"` // Whether the state is supplied as a list of event IDs or whether it // should be derived from the state at the previous events. - HasState bool + HasState bool `json:"has_state"` // Optional list of state event IDs forming the state before this event. // These state events must have already been persisted. // These are only used if HasState is true. // The list can be empty, for example when storing the first event in a room. - StateEventIDs []string + StateEventIDs []string `json:"state_event_ids"` // The server name to use to push this event to other servers. // Or empty if this event shouldn't be pushed to other servers. - SendAsServer string + SendAsServer string `json:"send_as_server"` } -// UnmarshalJSON implements json.Unmarshaller -func (ire *InputRoomEvent) UnmarshalJSON(data []byte) error { - // Create a struct rather than unmarshalling directly into the InputRoomEvent - // so that we can use json.RawMessage. - // We use json.RawMessage so that the event JSON is sent as JSON rather than - // being base64 encoded which is the default for []byte. - var content struct { - Kind int - Event *json.RawMessage - AuthEventIDs []string - StateEventIDs []string - HasState bool - SendAsServer string - } - if err := json.Unmarshal(data, &content); err != nil { - return err - } - ire.Kind = content.Kind - ire.AuthEventIDs = content.AuthEventIDs - ire.StateEventIDs = content.StateEventIDs - ire.HasState = content.HasState - ire.SendAsServer = content.SendAsServer - if content.Event != nil { - ire.Event = []byte(*content.Event) - } - return nil +// InputRoomEventsRequest is a request to InputRoomEvents +type InputRoomEventsRequest struct { + InputRoomEvents []InputRoomEvent `json:"input_room_events"` } -// MarshalJSON implements json.Marshaller -func (ire InputRoomEvent) MarshalJSON() ([]byte, error) { - // Create a struct rather than marshalling directly from the InputRoomEvent - // so that we can use json.RawMessage. - // We use json.RawMessage so that the event JSON is sent as JSON rather than - // being base64 encoded which is the default for []byte. - event := json.RawMessage(ire.Event) - content := struct { - Kind int - Event *json.RawMessage - AuthEventIDs []string - StateEventIDs []string - HasState bool - SendAsServer string - }{ - Kind: ire.Kind, - AuthEventIDs: ire.AuthEventIDs, - StateEventIDs: ire.StateEventIDs, - Event: &event, - HasState: ire.HasState, - SendAsServer: ire.SendAsServer, - } - return json.Marshal(&content) +// InputRoomEventsResponse is a response to InputRoomEvents +type InputRoomEventsResponse struct{} + +// RoomserverInputAPI is used to write events to the room server. +type RoomserverInputAPI interface { + InputRoomEvents( + request *InputRoomEventsRequest, + response *InputRoomEventsResponse, + ) error +} + +// RoomserverInputRoomEventsPath is the HTTP path for the InputRoomEvents API. +const RoomserverInputRoomEventsPath = "/api/roomserver/inputRoomEvents" + +// NewRoomserverInputAPIHTTP creates a RoomserverInputAPI implemented by talking to a HTTP POST API. +// If httpClient is nil then it uses the http.DefaultClient +func NewRoomserverInputAPIHTTP(roomserverURL string, httpClient *http.Client) RoomserverInputAPI { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &httpRoomserverInputAPI{roomserverURL, httpClient} +} + +type httpRoomserverInputAPI struct { + roomserverURL string + httpClient *http.Client +} + +// InputRoomEvents implements RoomserverInputAPI +func (h *httpRoomserverInputAPI) InputRoomEvents( + request *InputRoomEventsRequest, + response *InputRoomEventsResponse, +) error { + apiURL := h.roomserverURL + RoomserverInputRoomEventsPath + return postJSON(h.httpClient, apiURL, request, response) } diff --git a/src/github.com/matrix-org/dendrite/roomserver/api/query.go b/src/github.com/matrix-org/dendrite/roomserver/api/query.go index 9442f9f6a..6e6a838a9 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/api/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/api/query.go @@ -136,12 +136,12 @@ func NewRoomserverQueryAPIHTTP(roomserverURL string, httpClient *http.Client) Ro if httpClient == nil { httpClient = http.DefaultClient } - return &httpRoomserverQueryAPI{roomserverURL, *httpClient} + return &httpRoomserverQueryAPI{roomserverURL, httpClient} } type httpRoomserverQueryAPI struct { roomserverURL string - httpClient http.Client + httpClient *http.Client } // QueryLatestEventsAndState implements RoomserverQueryAPI @@ -171,7 +171,7 @@ func (h *httpRoomserverQueryAPI) QueryEventsByID( return postJSON(h.httpClient, apiURL, request, response) } -func postJSON(httpClient http.Client, apiURL string, request, response interface{}) error { +func postJSON(httpClient *http.Client, apiURL string, request, response interface{}) error { jsonBytes, err := json.Marshal(request) if err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/events.go b/src/github.com/matrix-org/dendrite/roomserver/input/events.go index acf302212..f8acff476 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/events.go @@ -49,10 +49,7 @@ type OutputRoomEventWriter interface { func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api.InputRoomEvent) error { // Parse and validate the event JSON - event, err := gomatrixserverlib.NewEventFromUntrustedJSON(input.Event) - if err != nil { - return err - } + event := input.Event // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(db, event, input.AuthEventIDs) @@ -79,8 +76,8 @@ func processRoomEvent(db RoomEventDatabase, ow OutputRoomEventWriter, input api. if input.HasState { // We've been told what the state at the event is so we don't need to calculate it. // Check that those state events are in the database and store the state. - entries, err := db.StateEntriesForEventIDs(input.StateEventIDs) - if err != nil { + var entries []types.StateEntry + if entries, err = db.StateEntriesForEventIDs(input.StateEventIDs); err != nil { return err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/input.go b/src/github.com/matrix-org/dendrite/roomserver/input/input.go new file mode 100644 index 000000000..ffbebd0c7 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/roomserver/input/input.go @@ -0,0 +1,107 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package input contains the code processes new room events +package input + +import ( + "encoding/json" + "fmt" + "sync/atomic" + + "net/http" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/util" + sarama "gopkg.in/Shopify/sarama.v1" +) + +// RoomserverInputAPI implements api.RoomserverInputAPI +type RoomserverInputAPI struct { + DB RoomEventDatabase + Producer sarama.SyncProducer + // The kafkaesque topic to output new room events to. + // This is the name used in kafka to identify the stream to write events to. + OutputRoomEventTopic string + // If non-nil then the API will stop processing messages after this + // many messages and will shutdown. Malformed messages are not in the count. + StopProcessingAfter *int64 + // If not-nil then the API will call this to shutdown the server. + // If this is nil then the API will continue to process messsages even + // though StopProcessingAfter has been reached. + ShutdownCallback func(reason string) + // How many messages the consumer has processed. + processed int64 +} + +// WriteOutputRoomEvent implements OutputRoomEventWriter +func (r *RoomserverInputAPI) WriteOutputRoomEvent(output api.OutputNewRoomEvent) error { + var m sarama.ProducerMessage + oe := api.OutputEvent{ + Type: api.OutputTypeNewRoomEvent, + NewRoomEvent: &output, + } + value, err := json.Marshal(oe) + if err != nil { + return err + } + m.Topic = r.OutputRoomEventTopic + m.Key = sarama.StringEncoder("") + m.Value = sarama.ByteEncoder(value) + _, _, err = r.Producer.SendMessage(&m) + return err +} + +// InputRoomEvents implements api.RoomserverInputAPI +func (r *RoomserverInputAPI) InputRoomEvents( + request *api.InputRoomEventsRequest, + response *api.InputRoomEventsResponse, +) error { + for i := range request.InputRoomEvents { + if err := processRoomEvent(r.DB, r, request.InputRoomEvents[i]); err != nil { + return err + } + // Update the number of processed messages using atomic addition because it is accessed from multiple goroutines. + processed := atomic.AddInt64(&r.processed, 1) + // Check if we should stop processing. + // Note that since we have multiple goroutines it's quite likely that we'll overshoot by a few messages. + // If we try to stop processing after M message and we have N goroutines then we will process somewhere + // between M and (N + M) messages because the N goroutines could all try to process what they think will be the + // last message. We could be more careful here but this is good enough for getting rough benchmarks. + if r.StopProcessingAfter != nil && processed >= int64(*r.StopProcessingAfter) { + if r.ShutdownCallback != nil { + r.ShutdownCallback(fmt.Sprintf("Stopping processing after %d messages", r.processed)) + } + } + } + return nil +} + +// SetupHTTP adds the RoomserverInputAPI handlers to the http.ServeMux. +func (r *RoomserverInputAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle(api.RoomserverInputRoomEventsPath, + common.MakeAPI("inputRoomEvents", func(req *http.Request) util.JSONResponse { + var request api.InputRoomEventsRequest + var response api.InputRoomEventsResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(400, err.Error()) + } + if err := r.InputRoomEvents(&request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: 200, JSON: &response} + }), + ) +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 31ebc0222..142df90e3 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -16,13 +16,14 @@ package query import ( "encoding/json" + "net/http" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "net/http" ) // RoomserverQueryAPIDatabase has the storage APIs needed to implement the query API. @@ -173,7 +174,7 @@ func (r *RoomserverQueryAPI) loadEvents(eventNIDs []types.EventNID) ([]gomatrixs func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { servMux.Handle( api.RoomserverQueryLatestEventsAndStatePath, - common.MakeAPI("query_latest_events_and_state", func(req *http.Request) util.JSONResponse { + common.MakeAPI("queryLatestEventsAndState", func(req *http.Request) util.JSONResponse { var request api.QueryLatestEventsAndStateRequest var response api.QueryLatestEventsAndStateResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -187,7 +188,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { ) servMux.Handle( api.RoomserverQueryStateAfterEventsPath, - common.MakeAPI("query_state_after_events", func(req *http.Request) util.JSONResponse { + common.MakeAPI("queryStateAfterEvents", func(req *http.Request) util.JSONResponse { var request api.QueryStateAfterEventsRequest var response api.QueryStateAfterEventsResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -201,7 +202,7 @@ func (r *RoomserverQueryAPI) SetupHTTP(servMux *http.ServeMux) { ) servMux.Handle( api.RoomserverQueryEventsByIDPath, - common.MakeAPI("query_events_by_id", func(req *http.Request) util.JSONResponse { + common.MakeAPI("queryEventsByID", func(req *http.Request) util.JSONResponse { var request api.QueryEventsByIDRequest var response api.QueryEventsByIDResponse if err := json.NewDecoder(req.Body).Decode(&request); err != nil { diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index a1efa1f27..27afd1c05 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -92,7 +92,7 @@ func (d *SyncServerDatabase) Events(eventIDs []string) ([]gomatrixserverlib.Even func (d *SyncServerDatabase) WriteEvent( ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, ) (streamPos types.StreamPosition, returnErr error) { - returnErr = runTransaction(d.db, func(txn *sql.Tx) error { + returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error pos, err := d.events.insertEvent(txn, ev, addStateEventIDs, removeStateEventIDs) if err != nil { @@ -162,7 +162,7 @@ func (d *SyncServerDatabase) SyncStreamPosition() (types.StreamPosition, error) // IncrementalSync returns all the data needed in order to create an incremental sync response. func (d *SyncServerDatabase) IncrementalSync(userID string, fromPos, toPos types.StreamPosition, numRecentEventsPerRoom int) (res *types.Response, returnErr error) { - returnErr = runTransaction(d.db, func(txn *sql.Tx) error { + returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { // Work out which rooms to return in the response. This is done by getting not only the currently // joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions. // This works out what the 'state' key should be for each room as well as which membership block @@ -223,7 +223,7 @@ func (d *SyncServerDatabase) CompleteSync(userID string, numRecentEventsPerRoom // a consistent view of the database throughout. This includes extracting the sync stream position. // This does have the unfortunate side-effect that all the matrixy logic resides in this function, // but it's better to not hide the fact that this is being done in a transaction. - returnErr = runTransaction(d.db, func(txn *sql.Tx) error { + returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { // Get the current stream position which we will base the sync response on. id, err := d.events.selectMaxID(txn) if err != nil { @@ -479,22 +479,3 @@ func getMembershipFromEvent(ev *gomatrixserverlib.Event, userID string) string { } return "" } - -func runTransaction(db *sql.DB, fn func(txn *sql.Tx) error) (err error) { - txn, err := db.Begin() - if err != nil { - return - } - defer func() { - if r := recover(); r != nil { - txn.Rollback() - panic(r) - } else if err != nil { - txn.Rollback() - } else { - err = txn.Commit() - } - }() - err = fn(txn) - return -}