diff --git a/.github/workflows/dendrite.yml b/.github/workflows/dendrite.yml index d337865f2..715235d3d 100644 --- a/.github/workflows/dendrite.yml +++ b/.github/workflows/dendrite.yml @@ -76,7 +76,7 @@ jobs: strategy: fail-fast: false matrix: - go: [ '1.16', '1.17', '1.18' ] + go: ["1.16", "1.17", "1.18"] steps: - uses: actions/checkout@v3 - name: Setup go @@ -101,9 +101,9 @@ jobs: strategy: fail-fast: false matrix: - go: [ '1.16', '1.17', '1.18' ] - goos: [ 'linux' ] - goarch: [ 'amd64', '386' ] + go: ["1.16", "1.17", "1.18"] + goos: ["linux"] + goarch: ["amd64", "386"] steps: - uses: actions/checkout@v3 - name: Setup go @@ -134,9 +134,9 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go: [ '1.16', '1.17', '1.18' ] - goos: [ 'windows' ] - goarch: [ 'amd64' ] + go: ["1.16", "1.17", "1.18"] + goos: ["windows"] + goarch: ["amd64"] steps: - uses: actions/checkout@v3 - name: Setup Go ${{ matrix.go }} @@ -163,7 +163,7 @@ jobs: # Dummy step to gate other tests on without repeating the whole list initial-tests-done: name: Initial tests passed - needs: [ lint, test, build, build_windows ] + needs: [lint, test, build, build_windows] runs-on: ubuntu-latest if: ${{ !cancelled() }} # Run this even if prior jobs were skipped steps: @@ -183,7 +183,7 @@ jobs: - name: Setup go uses: actions/setup-go@v2 with: - go-version: '1.16' + go-version: "1.16" - uses: actions/cache@v3 with: path: | @@ -232,7 +232,7 @@ jobs: working-directory: /src - name: Summarise results.tap if: ${{ always() }} - run: /sytest/scripts/tap_to_gha.pl /logs/results.tap + run: /sytest/scripts/tap_to_gha.pl /logs/results.tap - name: Upload Sytest logs uses: actions/upload-artifact@v2 @@ -322,3 +322,16 @@ jobs: COMPLEMENT_BASE_IMAGE: complement-dendrite:latest API: ${{ matrix.api && 1 }} working-directory: complement + + update-docker-images: + name: Update Docker images + if: github.repository == 'matrix-org/dendrite' && github.ref_name == 'main' + needs: [initial-tests-done, upgrade_test, sytest, complement] + runs-on: ubuntu-latest + steps: + - name: Check integration tests passed + uses: re-actors/alls-green@release/v1 + with: + jobs: ${{ toJSON(needs) }} + - name: Dispatch Docker build + uses: ./.github/workflows/dendrite.yml diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 2c5d9e5e6..129a80487 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -5,11 +5,8 @@ name: "Docker" on: release: # A GitHub release was published types: [published] - workflow_run: # The Dendrite pipeline completed successfully on main - workflows: [Dendrite] - types: [completed] - branches: [main] workflow_dispatch: # A build was manually requested + workflow_call: # Another pipeline called us env: DOCKER_NAMESPACE: matrixdotorg @@ -21,6 +18,9 @@ jobs: monolith: name: Monolith image runs-on: ubuntu-latest + permissions: + contents: read + packages: write steps: - name: Checkout uses: actions/checkout@v2 @@ -40,13 +40,11 @@ jobs: uses: docker/login-action@v1 with: registry: ghcr.io - username: ${{ github.actor }} + username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build monolith image - if: >- - (github.event_name == 'workflow_run' && github.event.workflow_run.conclusion == 'success') || - github.event_name == 'workflow_dispatch' + - name: Build main monolith image + if: github.ref_name == 'main' id: docker_build_monolith uses: docker/build-push-action@v2 with: @@ -80,6 +78,9 @@ jobs: polylith: name: Polylith image runs-on: ubuntu-latest + permissions: + contents: read + packages: write steps: - name: Checkout uses: actions/checkout@v2 @@ -99,13 +100,11 @@ jobs: uses: docker/login-action@v1 with: registry: ghcr.io - username: ${{ github.actor }} + username: ${{ github.repository_owner }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Build polylith image - if: >- - (github.event_name == 'workflow_run' && github.event.workflow_run.conclusion == 'success') || - github.event_name == 'workflow_dispatch' + - name: Build main polylith image + if: github.ref_name == 'main' id: docker_build_polylith uses: docker/build-push-action@v2 with: diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 748c191b0..328cf9155 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -1,11 +1,13 @@ package jetstream import ( + "fmt" "reflect" "strings" "sync" "time" + "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/sirupsen/logrus" @@ -20,7 +22,7 @@ var natsServerMutex sync.Mutex func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient.JetStreamContext, *natsclient.Conn) { // check if we need an in-process NATS Server if len(cfg.Addresses) != 0 { - return setupNATS(cfg, nil) + return setupNATS(process, cfg, nil) } natsServerMutex.Lock() if natsServer == nil { @@ -56,10 +58,10 @@ func Prepare(process *process.ProcessContext, cfg *config.JetStream) (natsclient if err != nil { logrus.Fatalln("Failed to create NATS client") } - return setupNATS(cfg, nc) + return setupNATS(process, cfg, nc) } -func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { +func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { if nc == nil { var err error nc, err = natsclient.Connect(strings.Join(cfg.Addresses, ",")) @@ -117,7 +119,40 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream namespaced.Name = name namespaced.Subjects = subjects if _, err = s.AddStream(&namespaced); err != nil { - logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream") + logger := logrus.WithError(err).WithFields(logrus.Fields{ + "stream": namespaced.Name, + "subjects": namespaced.Subjects, + }) + + // If the stream was supposed to be in-memory to begin with + // then an error here is fatal so we'll give up. + if namespaced.Storage == natsclient.MemoryStorage { + logger.WithError(err).Fatal("Unable to add in-memory stream") + } + + // The stream was supposed to be on disk. Let's try starting + // Dendrite with the stream in-memory instead. That'll mean that + // we can't recover anything that was queued on the disk but we + // will still be able to start and run hopefully in the meantime. + logger.WithError(err).Error("Unable to add stream") + sentry.CaptureException(fmt.Errorf("Unable to add stream %q: %w", namespaced.Name, err)) + + namespaced.Storage = natsclient.MemoryStorage + if _, err = s.AddStream(&namespaced); err != nil { + // We tried to add the stream in-memory instead but something + // went wrong. That's an unrecoverable situation so we will + // give up at this point. + logger.WithError(err).Fatal("Unable to add in-memory stream") + } + + if stream.Storage != namespaced.Storage { + // We've managed to add the stream in memory. What's on the + // disk will be left alone, but our ability to recover from a + // future crash will be limited. Yell about it. + sentry.CaptureException(fmt.Errorf("Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible", namespaced.Name)) + logrus.Warn("Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible") + process.Degraded() + } } } } diff --git a/setup/process/process.go b/setup/process/process.go index d55751d77..01eb26e22 100644 --- a/setup/process/process.go +++ b/setup/process/process.go @@ -2,13 +2,19 @@ package process import ( "context" + "fmt" "sync" + + "github.com/getsentry/sentry-go" + "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) type ProcessContext struct { wg *sync.WaitGroup // used to wait for components to shutdown ctx context.Context // cancelled when Stop is called shutdown context.CancelFunc // shut down Dendrite + degraded atomic.Bool } func NewProcessContext() *ProcessContext { @@ -43,3 +49,14 @@ func (b *ProcessContext) WaitForShutdown() <-chan struct{} { func (b *ProcessContext) WaitForComponentsToFinish() { b.wg.Wait() } + +func (b *ProcessContext) Degraded() { + if b.degraded.CAS(false, true) { + logrus.Warn("Dendrite is running in a degraded state") + sentry.CaptureException(fmt.Errorf("Process is running in a degraded state")) + } +} + +func (b *ProcessContext) IsDegraded() bool { + return b.degraded.Load() +} diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 1afcbe750..ccdac0864 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -147,7 +147,6 @@ func (p *PDUStreamProvider) IncrementalSync( To: to, Backwards: from > to, } - newPos = to var err error var stateDeltas []types.StateDelta @@ -172,14 +171,26 @@ func (p *PDUStreamProvider) IncrementalSync( req.Rooms[roomID] = gomatrixserverlib.Join } + if len(stateDeltas) == 0 { + return to + } + + newPos = from for _, delta := range stateDeltas { - if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { + var pos types.StreamPosition + if pos, err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil { req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") - return newPos + return to + } + switch { + case r.Backwards && pos < newPos: + fallthrough + case !r.Backwards && pos > newPos: + newPos = pos } } - return r.To + return newPos } func (p *PDUStreamProvider) addRoomDeltaToResponse( @@ -189,7 +200,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( delta types.StateDelta, eventFilter *gomatrixserverlib.RoomEventFilter, res *types.Response, -) error { +) (types.StreamPosition, error) { if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: @@ -204,19 +215,42 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( eventFilter, true, true, ) if err != nil { - return err + return r.From, err } recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) if err != nil { - return err + return r.From, err } - // XXX: should we ever get this far if we have no recent events or state in this room? - // in practice we do for peeks, but possibly not joins? + // If we didn't return any events at all then don't bother doing anything else. if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { - return nil + return r.To, nil + } + + // Sort the events so that we can pick out the latest events from both sections. + recentEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(recentEvents, gomatrixserverlib.TopologicalOrderByPrevEvents) + delta.StateEvents = gomatrixserverlib.HeaderedReverseTopologicalOrdering(delta.StateEvents, gomatrixserverlib.TopologicalOrderByAuthEvents) + + // Work out what the highest stream position is for all of the events in this + // room that were returned. + latestPosition := r.To + updateLatestPosition := func(mostRecentEventID string) { + if _, pos, err := p.DB.PositionInTopology(ctx, mostRecentEventID); err == nil { + switch { + case r.Backwards && pos > latestPosition: + fallthrough + case !r.Backwards && pos < latestPosition: + latestPosition = pos + } + } + } + if len(recentEvents) > 0 { + updateLatestPosition(recentEvents[len(recentEvents)-1].EventID()) + } + if len(delta.StateEvents) > 0 { + updateLatestPosition(delta.StateEvents[len(delta.StateEvents)-1].EventID()) } switch delta.Membership { @@ -250,7 +284,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( res.Rooms.Leave[delta.RoomID] = *lr } - return nil + return latestPosition, nil } func (p *PDUStreamProvider) getJoinResponseForCompleteSync(