Merge branch 'markjh/invitesV' of github.com:matrix-org/dendrite into markjh/invitesV

This commit is contained in:
Mark Haines 2017-09-20 11:08:12 +01:00
commit cedb04cad2
3 changed files with 34 additions and 21 deletions

View file

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"context"
"database/sql" "database/sql"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
@ -8,7 +9,7 @@ import (
const inviteEventsSchema = ` const inviteEventsSchema = `
CREATE TABLE IF NOT EXISTS syncapi_invite_events ( CREATE TABLE IF NOT EXISTS syncapi_invite_events (
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_output_room_event_id_seq'), id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
room_id TEXT NOT NULL, room_id TEXT NOT NULL,
target_user_id TEXT NOT NULL, target_user_id TEXT NOT NULL,
@ -51,10 +52,13 @@ func (s *inviteEventsStatements) prepare(db *sql.DB) (err error) {
} }
func (s *inviteEventsStatements) insertInviteEvent( func (s *inviteEventsStatements) insertInviteEvent(
inviteEvent gomatrixserverlib.Event, ctx context.Context, inviteEvent gomatrixserverlib.Event,
) (streamPos int64, err error) { ) (streamPos int64, err error) {
err = s.insertInviteEventStmt.QueryRow( err = s.insertInviteEventStmt.QueryRowContext(
inviteEvent.RoomID(), inviteEvent.EventID, *inviteEvent.StateKey(), ctx,
inviteEvent.RoomID(),
inviteEvent.EventID(),
*inviteEvent.StateKey(),
inviteEvent.JSON(), inviteEvent.JSON(),
).Scan(&streamPos) ).Scan(&streamPos)
return return
@ -63,10 +67,10 @@ func (s *inviteEventsStatements) insertInviteEvent(
// selectInviteEventsInRange returns a map of room ID to invite event for the // selectInviteEventsInRange returns a map of room ID to invite event for the
// active invites for the target user ID in the supplied range. // active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) selectInviteEventsInRange( func (s *inviteEventsStatements) selectInviteEventsInRange(
targetUserID string, startPos, endPos int64, ctx context.Context, targetUserID string, startPos, endPos int64,
) (map[string]gomatrixserverlib.Event, error) { ) (map[string]gomatrixserverlib.Event, error) {
rows, err := s.selectInviteEventsInRangeStmt.Query( rows, err := s.selectInviteEventsInRangeStmt.QueryContext(
targetUserID, startPos, endPos, ctx, targetUserID, startPos, endPos,
) )
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -30,11 +30,16 @@ const outputRoomEventsSchema = `
CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id; CREATE SEQUENCE IF NOT EXISTS syncapi_stream_id;
-- Stores output room events received from the roomserver. -- Stores output room events received from the roomserver.
CREATE SEQUENCE IF NOT EXISTS syncapi_output_room_event_id_seq
CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- An incrementing ID which denotes the position in the log that this event resides at. -- An incrementing ID which denotes the position in the log that this event resides at.
-- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments. -- NB: 'serial' makes no guarantees to increment by 1 every time, only that it increments.
-- This isn't a problem for us since we just want to order by this field. -- This isn't a problem for us since we just want to order by this field.
<<<<<<< HEAD
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'), id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
=======
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_output_room_event_id_seq'),
>>>>>>> 960083b099c96b25b0c81ea602b372470e9cf889
-- The event ID for the event -- The event ID for the event
event_id TEXT NOT NULL, event_id TEXT NOT NULL,
-- The 'room_id' key for the event. -- The 'room_id' key for the event.

View file

@ -47,32 +47,32 @@ type SyncServerDatabase struct {
accountData accountDataStatements accountData accountDataStatements
events outputRoomEventsStatements events outputRoomEventsStatements
roomstate currentRoomStateStatements roomstate currentRoomStateStatements
invites inviteEventsStatements
} }
// NewSyncServerDatabase creates a new sync server database // NewSyncServerDatabase creates a new sync server database
func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
var db *sql.DB var d SyncServerDatabase
var err error var err error
if db, err = sql.Open("postgres", dataSourceName); err != nil { if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err return nil, err
} }
partitions := common.PartitionOffsetStatements{} if err = d.partitions.Prepare(d.db, "syncapi"); err != nil {
if err = partitions.Prepare(db, "syncapi"); err != nil {
return nil, err return nil, err
} }
accountData := accountDataStatements{} if err = d.accountData.prepare(d.db); err != nil {
if err = accountData.prepare(db); err != nil {
return nil, err return nil, err
} }
events := outputRoomEventsStatements{} if err = d.events.prepare(d.db); err != nil {
if err = events.prepare(db); err != nil {
return nil, err return nil, err
} }
state := currentRoomStateStatements{} if err := d.roomstate.prepare(d.db); err != nil {
if err := state.prepare(db); err != nil {
return nil, err return nil, err
} }
return &SyncServerDatabase{db, partitions, accountData, events, state}, nil if err := d.invites.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
} }
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
@ -260,7 +260,7 @@ func (d *SyncServerDatabase) IncrementalSync(
} }
// TODO: This should be done in getStateDeltas // TODO: This should be done in getStateDeltas
if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { if err = d.addInvitesToResponse(ctx, txn, userID, fromPos, toPos, res); err != nil {
return nil, err return nil, err
} }
@ -322,7 +322,7 @@ func (d *SyncServerDatabase) CompleteSync(
res.Rooms.Join[roomID] = *jr res.Rooms.Join[roomID] = *jr
} }
if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil { if err = d.addInvitesToResponse(ctx, txn, userID, 0, pos, res); err != nil {
return nil, err return nil, err
} }
@ -365,7 +365,11 @@ func (d *SyncServerDatabase) UpsertAccountData(
} }
func (d *SyncServerDatabase) addInvitesToResponse( func (d *SyncServerDatabase) addInvitesToResponse(
ctx context.Context, txn *sql.Tx, userID string, res *types.Response) error { ctx context.Context, txn *sql.Tx,
userID string,
_, _ types.StreamPosition,
res *types.Response,
) error {
// Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark. // Add invites - TODO: This will break over federation as they won't be in the current state table according to Mark.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "invite") roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "invite")
if err != nil { if err != nil {