Use HeaderedEvents in appservice component (#939)
* App service HeaderedEvents * Fix database queries * Fix lint error
This commit is contained in:
parent
951b5d5e68
commit
0b732d6f45
|
@ -101,11 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
"type": ev.Type(),
|
"type": ev.Type(),
|
||||||
}).Info("appservice received an event from roomserver")
|
}).Info("appservice received an event from roomserver")
|
||||||
|
|
||||||
missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event)
|
missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
events := append(missingEvents, ev.Event)
|
events := append(missingEvents, ev)
|
||||||
|
|
||||||
// Send event to any relevant application services
|
// Send event to any relevant application services
|
||||||
return s.filterRoomserverEvents(context.TODO(), events)
|
return s.filterRoomserverEvents(context.TODO(), events)
|
||||||
|
@ -114,19 +114,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
||||||
// lookupMissingStateEvents looks up the state events that are added by a new event,
|
// lookupMissingStateEvents looks up the state events that are added by a new event,
|
||||||
// and returns any not already present.
|
// and returns any not already present.
|
||||||
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
|
func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
|
||||||
addsStateEventIDs []string, event gomatrixserverlib.Event,
|
addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent,
|
||||||
) ([]gomatrixserverlib.Event, error) {
|
) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||||
// Fast path if there aren't any new state events.
|
// Fast path if there aren't any new state events.
|
||||||
if len(addsStateEventIDs) == 0 {
|
if len(addsStateEventIDs) == 0 {
|
||||||
return []gomatrixserverlib.Event{}, nil
|
return []gomatrixserverlib.HeaderedEvent{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fast path if the only state event added is the event itself.
|
// Fast path if the only state event added is the event itself.
|
||||||
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() {
|
||||||
return []gomatrixserverlib.Event{}, nil
|
return []gomatrixserverlib.HeaderedEvent{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
result := []gomatrixserverlib.Event{}
|
result := []gomatrixserverlib.HeaderedEvent{}
|
||||||
missing := []string{}
|
missing := []string{}
|
||||||
for _, id := range addsStateEventIDs {
|
for _, id := range addsStateEventIDs {
|
||||||
if id != event.EventID() {
|
if id != event.EventID() {
|
||||||
|
@ -143,9 +143,7 @@ func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, headeredEvent := range eventResp.Events {
|
result = append(result, eventResp.Events...)
|
||||||
result = append(result, headeredEvent.Event)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
@ -157,7 +155,7 @@ func (s *OutputRoomEventConsumer) lookupMissingStateEvents(
|
||||||
// application service.
|
// application service.
|
||||||
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
events []gomatrixserverlib.Event,
|
events []gomatrixserverlib.HeaderedEvent,
|
||||||
) error {
|
) error {
|
||||||
for _, ws := range s.workerStates {
|
for _, ws := range s.workerStates {
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
|
@ -180,7 +178,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
|
||||||
|
|
||||||
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
|
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
|
||||||
// event falls within one of a given application service's namespaces.
|
// event falls within one of a given application service's namespaces.
|
||||||
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool {
|
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
|
||||||
// No reason to queue events if they'll never be sent to the application
|
// No reason to queue events if they'll never be sent to the application
|
||||||
// service
|
// service
|
||||||
if appservice.URL == "" {
|
if appservice.URL == "" {
|
||||||
|
|
|
@ -21,8 +21,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database interface {
|
type Database interface {
|
||||||
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error
|
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error
|
||||||
GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error)
|
GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error)
|
||||||
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
|
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)
|
||||||
UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
|
UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error
|
||||||
RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
|
RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error
|
||||||
|
|
|
@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS appservice_events (
|
||||||
-- The ID of the application service the event will be sent to
|
-- The ID of the application service the event will be sent to
|
||||||
as_id TEXT NOT NULL,
|
as_id TEXT NOT NULL,
|
||||||
-- JSON representation of the event
|
-- JSON representation of the event
|
||||||
event_json TEXT NOT NULL,
|
headered_event_json TEXT NOT NULL,
|
||||||
-- The ID of the transaction that this event is a part of
|
-- The ID of the transaction that this event is a part of
|
||||||
txn_id BIGINT NOT NULL
|
txn_id BIGINT NOT NULL
|
||||||
);
|
);
|
||||||
|
@ -42,14 +42,14 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
|
||||||
`
|
`
|
||||||
|
|
||||||
const selectEventsByApplicationServiceIDSQL = "" +
|
const selectEventsByApplicationServiceIDSQL = "" +
|
||||||
"SELECT id, event_json, txn_id " +
|
"SELECT id, headered_event_json, txn_id " +
|
||||||
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
||||||
|
|
||||||
const countEventsByApplicationServiceIDSQL = "" +
|
const countEventsByApplicationServiceIDSQL = "" +
|
||||||
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
||||||
|
|
||||||
const insertEventSQL = "" +
|
const insertEventSQL = "" +
|
||||||
"INSERT INTO appservice_events(as_id, event_json, txn_id) " +
|
"INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
|
||||||
"VALUES ($1, $2, $3)"
|
"VALUES ($1, $2, $3)"
|
||||||
|
|
||||||
const updateTxnIDForEventsSQL = "" +
|
const updateTxnIDForEventsSQL = "" +
|
||||||
|
@ -107,7 +107,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
|
||||||
limit int,
|
limit int,
|
||||||
) (
|
) (
|
||||||
txnID, maxID int,
|
txnID, maxID int,
|
||||||
events []gomatrixserverlib.Event,
|
events []gomatrixserverlib.HeaderedEvent,
|
||||||
eventsRemaining bool,
|
eventsRemaining bool,
|
||||||
err error,
|
err error,
|
||||||
) {
|
) {
|
||||||
|
@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
|
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
|
||||||
// Get current time for use in calculating event age
|
// Get current time for use in calculating event age
|
||||||
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
|
||||||
// new ones. Send back those events first.
|
// new ones. Send back those events first.
|
||||||
lastTxnID := invalidTxnID
|
lastTxnID := invalidTxnID
|
||||||
for eventsProcessed := 0; eventRows.Next(); {
|
for eventsProcessed := 0; eventRows.Next(); {
|
||||||
var event gomatrixserverlib.Event
|
var event gomatrixserverlib.HeaderedEvent
|
||||||
var eventJSON []byte
|
var eventJSON []byte
|
||||||
var id int
|
var id int
|
||||||
err = eventRows.Scan(
|
err = eventRows.Scan(
|
||||||
|
@ -209,7 +209,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID(
|
||||||
func (s *eventsStatements) insertEvent(
|
func (s *eventsStatements) insertEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
appServiceID string,
|
||||||
event *gomatrixserverlib.Event,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
// Convert event to JSON before inserting
|
// Convert event to JSON before inserting
|
||||||
eventJSON, err := json.Marshal(event)
|
eventJSON, err := json.Marshal(event)
|
||||||
|
|
|
@ -52,12 +52,12 @@ func (d *Database) prepare() error {
|
||||||
return d.txnID.prepare(d.db)
|
return d.txnID.prepare(d.db)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
|
// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database
|
||||||
// for a transaction worker to pull and later send to an application service.
|
// for a transaction worker to pull and later send to an application service.
|
||||||
func (d *Database) StoreEvent(
|
func (d *Database) StoreEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
appServiceID string,
|
||||||
event *gomatrixserverlib.Event,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
) error {
|
) error {
|
||||||
return d.events.insertEvent(ctx, appServiceID, event)
|
return d.events.insertEvent(ctx, appServiceID, event)
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,7 @@ func (d *Database) GetEventsWithAppServiceID(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
appServiceID string,
|
||||||
limit int,
|
limit int,
|
||||||
) (int, int, []gomatrixserverlib.Event, bool, error) {
|
) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) {
|
||||||
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS appservice_events (
|
||||||
-- The ID of the application service the event will be sent to
|
-- The ID of the application service the event will be sent to
|
||||||
as_id TEXT NOT NULL,
|
as_id TEXT NOT NULL,
|
||||||
-- JSON representation of the event
|
-- JSON representation of the event
|
||||||
event_json TEXT NOT NULL,
|
headered_event_json TEXT NOT NULL,
|
||||||
-- The ID of the transaction that this event is a part of
|
-- The ID of the transaction that this event is a part of
|
||||||
txn_id INTEGER NOT NULL
|
txn_id INTEGER NOT NULL
|
||||||
);
|
);
|
||||||
|
@ -42,14 +42,14 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
|
||||||
`
|
`
|
||||||
|
|
||||||
const selectEventsByApplicationServiceIDSQL = "" +
|
const selectEventsByApplicationServiceIDSQL = "" +
|
||||||
"SELECT id, event_json, txn_id " +
|
"SELECT id, headered_event_json, txn_id " +
|
||||||
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC"
|
||||||
|
|
||||||
const countEventsByApplicationServiceIDSQL = "" +
|
const countEventsByApplicationServiceIDSQL = "" +
|
||||||
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
"SELECT COUNT(id) FROM appservice_events WHERE as_id = $1"
|
||||||
|
|
||||||
const insertEventSQL = "" +
|
const insertEventSQL = "" +
|
||||||
"INSERT INTO appservice_events(as_id, event_json, txn_id) " +
|
"INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " +
|
||||||
"VALUES ($1, $2, $3)"
|
"VALUES ($1, $2, $3)"
|
||||||
|
|
||||||
const updateTxnIDForEventsSQL = "" +
|
const updateTxnIDForEventsSQL = "" +
|
||||||
|
@ -107,7 +107,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
|
||||||
limit int,
|
limit int,
|
||||||
) (
|
) (
|
||||||
txnID, maxID int,
|
txnID, maxID int,
|
||||||
events []gomatrixserverlib.Event,
|
events []gomatrixserverlib.HeaderedEvent,
|
||||||
eventsRemaining bool,
|
eventsRemaining bool,
|
||||||
err error,
|
err error,
|
||||||
) {
|
) {
|
||||||
|
@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) {
|
func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) {
|
||||||
// Get current time for use in calculating event age
|
// Get current time for use in calculating event age
|
||||||
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.
|
||||||
// new ones. Send back those events first.
|
// new ones. Send back those events first.
|
||||||
lastTxnID := invalidTxnID
|
lastTxnID := invalidTxnID
|
||||||
for eventsProcessed := 0; eventRows.Next(); {
|
for eventsProcessed := 0; eventRows.Next(); {
|
||||||
var event gomatrixserverlib.Event
|
var event gomatrixserverlib.HeaderedEvent
|
||||||
var eventJSON []byte
|
var eventJSON []byte
|
||||||
var id int
|
var id int
|
||||||
err = eventRows.Scan(
|
err = eventRows.Scan(
|
||||||
|
@ -209,7 +209,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID(
|
||||||
func (s *eventsStatements) insertEvent(
|
func (s *eventsStatements) insertEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
appServiceID string,
|
||||||
event *gomatrixserverlib.Event,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
// Convert event to JSON before inserting
|
// Convert event to JSON before inserting
|
||||||
eventJSON, err := json.Marshal(event)
|
eventJSON, err := json.Marshal(event)
|
||||||
|
|
|
@ -53,12 +53,12 @@ func (d *Database) prepare() error {
|
||||||
return d.txnID.prepare(d.db)
|
return d.txnID.prepare(d.db)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database
|
// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database
|
||||||
// for a transaction worker to pull and later send to an application service.
|
// for a transaction worker to pull and later send to an application service.
|
||||||
func (d *Database) StoreEvent(
|
func (d *Database) StoreEvent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
appServiceID string,
|
||||||
event *gomatrixserverlib.Event,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
) error {
|
) error {
|
||||||
return d.events.insertEvent(ctx, appServiceID, event)
|
return d.events.insertEvent(ctx, appServiceID, event)
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,7 @@ func (d *Database) GetEventsWithAppServiceID(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
appServiceID string,
|
appServiceID string,
|
||||||
limit int,
|
limit int,
|
||||||
) (int, int, []gomatrixserverlib.Event, bool, error) {
|
) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) {
|
||||||
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -181,9 +181,14 @@ func createTransaction(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ev []gomatrixserverlib.Event
|
||||||
|
for _, e := range events {
|
||||||
|
ev = append(ev, e.Event)
|
||||||
|
}
|
||||||
|
|
||||||
// Create a transaction and store the events inside
|
// Create a transaction and store the events inside
|
||||||
transaction := gomatrixserverlib.ApplicationServiceTransaction{
|
transaction := gomatrixserverlib.ApplicationServiceTransaction{
|
||||||
Events: events,
|
Events: ev,
|
||||||
}
|
}
|
||||||
|
|
||||||
transactionJSON, err = json.Marshal(transaction)
|
transactionJSON, err = json.Marshal(transaction)
|
||||||
|
|
Loading…
Reference in a new issue