Merge branch 'main' into s7evink/createaccfix

This commit is contained in:
S7evinK 2022-03-25 14:15:49 +01:00 committed by GitHub
commit a7c95377fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 39 deletions

View file

@ -76,7 +76,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [ '1.16', '1.17', '1.18' ]
go: ["1.16", "1.17", "1.18"]
steps:
- uses: actions/checkout@v3
- name: Setup go
@ -101,9 +101,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [ '1.16', '1.17', '1.18' ]
goos: [ 'linux' ]
goarch: [ 'amd64', '386' ]
go: ["1.16", "1.17", "1.18"]
goos: ["linux"]
goarch: ["amd64", "386"]
steps:
- uses: actions/checkout@v3
- name: Setup go
@ -134,9 +134,9 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.16', '1.17', '1.18' ]
goos: [ 'windows' ]
goarch: [ 'amd64' ]
go: ["1.16", "1.17", "1.18"]
goos: ["windows"]
goarch: ["amd64"]
steps:
- uses: actions/checkout@v3
- name: Setup Go ${{ matrix.go }}
@ -163,7 +163,7 @@ jobs:
# Dummy step to gate other tests on without repeating the whole list
initial-tests-done:
name: Initial tests passed
needs: [ lint, test, build, build_windows ]
needs: [lint, test, build, build_windows]
runs-on: ubuntu-latest
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
steps:
@ -183,7 +183,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v2
with:
go-version: '1.16'
go-version: "1.16"
- uses: actions/cache@v3
with:
path: |
@ -232,7 +232,7 @@ jobs:
working-directory: /src
- name: Summarise results.tap
if: ${{ always() }}
run: /sytest/scripts/tap_to_gha.pl /logs/results.tap
run: /sytest/scripts/tap_to_gha.pl /logs/results.tap
- name: Upload Sytest logs
uses: actions/upload-artifact@v2
@ -322,3 +322,16 @@ jobs:
COMPLEMENT_BASE_IMAGE: complement-dendrite:latest
API: ${{ matrix.api && 1 }}
working-directory: complement
update-docker-images:
name: Update Docker images
if: github.repository == 'matrix-org/dendrite' && github.ref_name == 'main'
needs: [initial-tests-done, upgrade_test, sytest, complement]
runs-on: ubuntu-latest
steps:
- name: Check integration tests passed
uses: re-actors/alls-green@release/v1
with:
jobs: ${{ toJSON(needs) }}
- name: Dispatch Docker build
uses: ./.github/workflows/dendrite.yml

View file

@ -5,11 +5,8 @@ name: "Docker"
on:
release: # A GitHub release was published
types: [published]
workflow_run: # The Dendrite pipeline completed successfully on main
workflows: [Dendrite]
types: [completed]
branches: [main]
workflow_dispatch: # A build was manually requested
workflow_call: # Another pipeline called us
env:
DOCKER_NAMESPACE: matrixdotorg
@ -21,6 +18,9 @@ jobs:
monolith:
name: Monolith image
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout
uses: actions/checkout@v2
@ -40,13 +40,11 @@ jobs:
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ github.actor }}
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build monolith image
if: >-
(github.event_name == 'workflow_run' && github.event.workflow_run.conclusion == 'success') ||
github.event_name == 'workflow_dispatch'
- name: Build main monolith image
if: github.ref_name == 'main'
id: docker_build_monolith
uses: docker/build-push-action@v2
with:
@ -80,6 +78,9 @@ jobs:
polylith:
name: Polylith image
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout
uses: actions/checkout@v2
@ -99,13 +100,11 @@ jobs:
uses: docker/login-action@v1
with:
registry: ghcr.io
username: ${{ github.actor }}
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build polylith image
if: >-
(github.event_name == 'workflow_run' && github.event.workflow_run.conclusion == 'success') ||
github.event_name == 'workflow_dispatch'
- name: Build main polylith image
if: github.ref_name == 'main'
id: docker_build_polylith
uses: docker/build-push-action@v2
with:

View file

@ -1,11 +1,13 @@
package jetstream
import (
"fmt"
"reflect"
"strings"
"sync"
"time"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/sirupsen/logrus"
@ -20,7 +22,7 @@ var natsServerMutex sync.Mutex
func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(cfg, nil)
return setupNATS(process, cfg, nil)
}
natsServerMutex.Lock()
if natsServer == nil {
@ -56,10 +58,10 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient
if err != nil {
logrus.Fatalln("Failed to create NATS client")
}
return setupNATS(cfg, nc)
return setupNATS(process, cfg, nc)
}
func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) {
if nc == nil {
var err error
nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ","))
@ -117,7 +119,40 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
namespaced.Name = name
namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
logger := logrus.WithError(err).WithFields(logrus.Fields{
"stream": namespaced.Name,
"subjects": namespaced.Subjects,
})
// If the stream was supposed to be in-memory to begin with
// then an error here is fatal so we'll give up.
if namespaced.Storage == natsclient.MemoryStorage {
logger.WithError(err).Fatal("Unable to add in-memory stream")
}
// The stream was supposed to be on disk. Let's try starting
// Dendrite with the stream in-memory instead. That'll mean that
// we can't recover anything that was queued on the disk but we
// will still be able to start and run hopefully in the meantime.
logger.WithError(err).Error("Unable to add stream")
sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err))
namespaced.Storage = natsclient.MemoryStorage
if _, err = s.AddStream(&namespaced); err != nil {
// We tried to add the stream in-memory instead but something
// went wrong. That's an unrecoverable situation so we will
// give up at this point.
logger.WithError(err).Fatal("Unable to add in-memory stream")
}
if stream.Storage != namespaced.Storage {
// We've managed to add the stream in memory. What's on the
// disk will be left alone, but our ability to recover from a
// future crash will be limited. Yell about it.
sentry.CaptureException(fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible", namespaced.Name))
logrus.Warn("Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible")
process.Degraded()
}
}
}
}

View file

@ -2,13 +2,19 @@ package process
import (
"context"
"fmt"
"sync"
"github.com/getsentry/sentry-go"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
type ProcessContext struct {
wg *sync.WaitGroup // used to wait for components to shutdown
ctx context.Context // cancelled when Stop is called
shutdown context.CancelFunc // shut down Dendrite
degraded atomic.Bool
}
func NewProcessContext() *ProcessContext {
@ -43,3 +49,14 @@ func (b *ProcessContext) WaitForShutdown() <-chan struct{} {
func (b *ProcessContext) WaitForComponentsToFinish() {
b.wg.Wait()
}
func (b *ProcessContext) Degraded() {
if b.degraded.CAS(false, true) {
logrus.Warn("Dendrite is running in a degraded state")
sentry.CaptureException(fmt.Errorf("Process is running in a degraded state"))
}
}
func (b *ProcessContext) IsDegraded() bool {
return b.degraded.Load()
}

View file

@ -147,7 +147,6 @@ func (p *PDUStreamProvider) IncrementalSync(
To: to,
Backwards: from > to,
}
newPos = to
var err error
var stateDeltas []types.StateDelta
@ -172,14 +171,26 @@ func (p *PDUStreamProvider) IncrementalSync(
req.Rooms[roomID] = gomatrixserverlib.Join
}
if len(stateDeltas) == 0 {
return to
}
newPos = from
for _, delta := range stateDeltas {
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
var pos types.StreamPosition
if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return newPos
return to
}
switch {
case r.Backwards && pos < newPos:
fallthrough
case !r.Backwards && pos > newPos:
newPos = pos
}
}
return r.To
return newPos
}
func (p *PDUStreamProvider) addRoomDeltaToResponse(
@ -189,7 +200,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
delta types.StateDelta,
eventFilter *gomatrixserverlib.RoomEventFilter,
res *types.Response,
) error {
) (types.StreamPosition, error) {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
@ -204,19 +215,42 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
eventFilter, true, true,
)
if err != nil {
return err
return r.From, err
}
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
if err != nil {
return err
return r.From, err
}
// XXX: should we ever get this far if we have no recent events or state in this room?
// in practice we do for peeks, but possibly not joins?
// If we didn't return any events at all then don't bother doing anything else.
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
return nil
return r.To, nil
}
// Sort the events so that we can pick out the latest events from both sections.
recentEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(recentEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(delta.StateEvents, gomatrixserverlib.TopologicalOrderByAuthEvents)
// Work out what the highest stream position is for all of the events in this
// room that were returned.
latestPosition := r.To
updateLatestPosition := func(mostRecentEventID string) {
if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil {
switch {
case r.Backwards && pos > latestPosition:
fallthrough
case !r.Backwards && pos < latestPosition:
latestPosition = pos
}
}
}
if len(recentEvents) > 0 {
updateLatestPosition(recentEvents[len(recentEvents)-1].EventID())
}
if len(delta.StateEvents) > 0 {
updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID())
}
switch delta.Membership {
@ -250,7 +284,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
res.Rooms.Leave[delta.RoomID] = *lr
}
return nil
return latestPosition, nil
}
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(