mirror of
https://github.com/matrix-org/dendrite.git
synced 2025-12-17 03:43:11 -06:00
remove more functions
This commit is contained in:
parent
37fd22ef8d
commit
de9a48f76d
|
|
@ -23,11 +23,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
|
|
||||||
// Import the sqlite3 package
|
// Import the sqlite3 package
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
|
@ -126,113 +123,6 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
|
|
||||||
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
|
|
||||||
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
|
|
||||||
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
|
|
||||||
if err := d.Database.BackwardExtremities.DeleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we have all of the event's previous events. If an event is
|
|
||||||
// missing, add it to the room's backward extremities.
|
|
||||||
prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
var found bool
|
|
||||||
for _, eID := range ev.PrevEventIDs() {
|
|
||||||
found = false
|
|
||||||
for _, prevEv := range prevEvents {
|
|
||||||
if eID == prevEv.EventID() {
|
|
||||||
found = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the event is missing, consider it a backward extremity.
|
|
||||||
if !found {
|
|
||||||
if err = d.Database.BackwardExtremities.InsertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteEvent into the database. It is not safe to call this function from multiple goroutines, as it would create races
|
|
||||||
// when generating the sync stream position for this event. Returns the sync stream position for the inserted event.
|
|
||||||
// Returns an error if there was a problem inserting this event.
|
|
||||||
func (d *SyncServerDatasource) WriteEvent(
|
|
||||||
ctx context.Context,
|
|
||||||
ev *gomatrixserverlib.HeaderedEvent,
|
|
||||||
addStateEvents []gomatrixserverlib.HeaderedEvent,
|
|
||||||
addStateEventIDs, removeStateEventIDs []string,
|
|
||||||
transactionID *api.TransactionID, excludeFromSync bool,
|
|
||||||
) (pduPosition types.StreamPosition, returnErr error) {
|
|
||||||
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
|
||||||
var err error
|
|
||||||
pos, err := d.Database.OutputEvents.InsertEvent(
|
|
||||||
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
pduPosition = pos
|
|
||||||
|
|
||||||
if err = d.Database.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
|
|
||||||
// Nothing to do, the event may have just been a message event.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
|
|
||||||
})
|
|
||||||
|
|
||||||
return pduPosition, returnErr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *SyncServerDatasource) updateRoomState(
|
|
||||||
ctx context.Context, txn *sql.Tx,
|
|
||||||
removedEventIDs []string,
|
|
||||||
addedEvents []gomatrixserverlib.HeaderedEvent,
|
|
||||||
pduPosition types.StreamPosition,
|
|
||||||
) error {
|
|
||||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
|
||||||
for _, eventID := range removedEventIDs {
|
|
||||||
if err := d.Database.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, event := range addedEvents {
|
|
||||||
if event.StateKey() == nil {
|
|
||||||
// ignore non state events
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var membership *string
|
|
||||||
if event.Type() == "m.room.member" {
|
|
||||||
value, err := event.Membership()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
membership = &value
|
|
||||||
}
|
|
||||||
if err := d.Database.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncPosition returns the latest positions for syncing.
|
// SyncPosition returns the latest positions for syncing.
|
||||||
func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
|
func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
|
||||||
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
|
||||||
|
|
@ -921,29 +811,6 @@ func (d *SyncServerDatasource) currentStateStreamEventsForRoom(
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamEventsToEvents converts streamEvent to Event. If device is non-nil and
|
|
||||||
// matches the streamevent.transactionID device then the transaction ID gets
|
|
||||||
// added to the unsigned section of the output event.
|
|
||||||
func (d *SyncServerDatasource) StreamEventsToEvents(device *authtypes.Device, in []types.StreamEvent) []gomatrixserverlib.HeaderedEvent {
|
|
||||||
out := make([]gomatrixserverlib.HeaderedEvent, len(in))
|
|
||||||
for i := 0; i < len(in); i++ {
|
|
||||||
out[i] = in[i].HeaderedEvent
|
|
||||||
if device != nil && in[i].TransactionID != nil {
|
|
||||||
if device.UserID == in[i].Sender() && device.SessionID == in[i].TransactionID.SessionID {
|
|
||||||
err := out[i].SetUnsignedField(
|
|
||||||
"transaction_id", in[i].TransactionID.TransactionID,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithFields(logrus.Fields{
|
|
||||||
"event_id": out[i].EventID(),
|
|
||||||
}).WithError(err).Warnf("Failed to add transaction ID to event")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
// There may be some overlap where events in stateEvents are already in recentEvents, so filter
|
||||||
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
// them out so we don't include them twice in the /sync response. They should be in recentEvents
|
||||||
// only, so clients get to the correct state once they have rolled forward.
|
// only, so clients get to the correct state once they have rolled forward.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue