mirror of
https://github.com/matrix-org/dendrite.git
synced 2026-01-16 10:33:11 -06:00
Switch to stdlib atomic
No need for `go.uber.org/atomic` anymore since 1.19 adds new atomic types
This commit is contained in:
parent
1755a2e88d
commit
5253a99bce
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||||
|
|
@ -30,7 +31,6 @@ import (
|
||||||
"github.com/matrix-org/gomatrix"
|
"github.com/matrix-org/gomatrix"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"go.uber.org/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -162,7 +162,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share
|
||||||
// requests to retry.
|
// requests to retry.
|
||||||
func (oq *destinationQueue) wakeQueueIfNeeded() {
|
func (oq *destinationQueue) wakeQueueIfNeeded() {
|
||||||
// If we are backing off then interrupt the backoff.
|
// If we are backing off then interrupt the backoff.
|
||||||
if oq.backingOff.CAS(true, false) {
|
if oq.backingOff.CompareAndSwap(true, false) {
|
||||||
oq.interruptBackoff <- true
|
oq.interruptBackoff <- true
|
||||||
}
|
}
|
||||||
// If we aren't running then wake up the queue.
|
// If we aren't running then wake up the queue.
|
||||||
|
|
@ -242,7 +242,7 @@ func (oq *destinationQueue) getPendingFromDatabase() {
|
||||||
func (oq *destinationQueue) backgroundSend() {
|
func (oq *destinationQueue) backgroundSend() {
|
||||||
// Check if a worker is already running, and if it isn't, then
|
// Check if a worker is already running, and if it isn't, then
|
||||||
// mark it as started.
|
// mark it as started.
|
||||||
if !oq.running.CAS(false, true) {
|
if !oq.running.CompareAndSwap(false, true) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
destinationQueueRunning.Inc()
|
destinationQueueRunning.Inc()
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,11 @@ package statistics
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"go.uber.org/atomic"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||||
)
|
)
|
||||||
|
|
@ -95,7 +95,7 @@ func (s *ServerStatistics) cancel() {
|
||||||
// we will unblacklist it.
|
// we will unblacklist it.
|
||||||
func (s *ServerStatistics) Success() {
|
func (s *ServerStatistics) Success() {
|
||||||
s.cancel()
|
s.cancel()
|
||||||
s.successCounter.Inc()
|
s.successCounter.Add(1)
|
||||||
s.backoffCount.Store(0)
|
s.backoffCount.Store(0)
|
||||||
if s.statistics.DB != nil {
|
if s.statistics.DB != nil {
|
||||||
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
|
||||||
|
|
@ -114,8 +114,8 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
|
||||||
// a new backoff period. Increase the failure counter and
|
// a new backoff period. Increase the failure counter and
|
||||||
// start a goroutine which will wait out the backoff and
|
// start a goroutine which will wait out the backoff and
|
||||||
// unset the backoffStarted flag when done.
|
// unset the backoffStarted flag when done.
|
||||||
if s.backoffStarted.CAS(false, true) {
|
if s.backoffStarted.CompareAndSwap(false, true) {
|
||||||
if s.backoffCount.Inc() >= s.statistics.FailuresUntilBlacklist {
|
if s.backoffCount.Add(1) >= s.statistics.FailuresUntilBlacklist {
|
||||||
s.blacklisted.Store(true)
|
s.blacklisted.Store(true)
|
||||||
if s.statistics.DB != nil {
|
if s.statistics.DB != nil {
|
||||||
if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil {
|
if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil {
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -42,7 +42,6 @@ require (
|
||||||
github.com/uber/jaeger-client-go v2.30.0+incompatible
|
github.com/uber/jaeger-client-go v2.30.0+incompatible
|
||||||
github.com/uber/jaeger-lib v2.4.1+incompatible
|
github.com/uber/jaeger-lib v2.4.1+incompatible
|
||||||
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
|
github.com/yggdrasil-network/yggdrasil-go v0.4.5-0.20220901155642-4f2abece817c
|
||||||
go.uber.org/atomic v1.9.0
|
|
||||||
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
|
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90
|
||||||
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
golang.org/x/image v0.0.0-20220413100746-70e8d0d3baa9
|
||||||
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
|
golang.org/x/mobile v0.0.0-20220722155234-aaac322e2105
|
||||||
|
|
@ -123,6 +122,7 @@ require (
|
||||||
github.com/tidwall/match v1.1.1 // indirect
|
github.com/tidwall/match v1.1.1 // indirect
|
||||||
github.com/tidwall/pretty v1.2.0 // indirect
|
github.com/tidwall/pretty v1.2.0 // indirect
|
||||||
go.etcd.io/bbolt v1.3.5 // indirect
|
go.etcd.io/bbolt v1.3.5 // indirect
|
||||||
|
go.uber.org/atomic v1.9.0 // indirect
|
||||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||||
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
|
golang.org/x/sys v0.0.0-20220906135438-9e1f76180b77 // indirect
|
||||||
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
golang.org/x/text v0.3.8-0.20211004125949-5bd84dd9b33b // indirect
|
||||||
|
|
|
||||||
|
|
@ -3,8 +3,7 @@ package sqlutil
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
"go.uber.org/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExclusiveWriter implements sqlutil.Writer.
|
// ExclusiveWriter implements sqlutil.Writer.
|
||||||
|
|
@ -57,7 +56,7 @@ func (w *ExclusiveWriter) Do(db *sql.DB, txn *sql.Tx, f func(txn *sql.Tx) error)
|
||||||
// opened using the database object from the task and then this will
|
// opened using the database object from the task and then this will
|
||||||
// be passed as a parameter to the task function.
|
// be passed as a parameter to the task function.
|
||||||
func (w *ExclusiveWriter) run() {
|
func (w *ExclusiveWriter) run() {
|
||||||
if !w.running.CAS(false, true) {
|
if !w.running.CompareAndSwap(false, true) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if tracingEnabled {
|
if tracingEnabled {
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -33,7 +34,6 @@ import (
|
||||||
sentryhttp "github.com/getsentry/sentry-go/http"
|
sentryhttp "github.com/getsentry/sentry-go/http"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
"go.uber.org/atomic"
|
|
||||||
"golang.org/x/net/http2"
|
"golang.org/x/net/http2"
|
||||||
"golang.org/x/net/http2/h2c"
|
"golang.org/x/net/http2/h2c"
|
||||||
|
|
||||||
|
|
@ -498,7 +498,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
||||||
logrus.Infof("Starting internal %s listener on %s", b.componentName, internalServ.Addr)
|
logrus.Infof("Starting internal %s listener on %s", b.componentName, internalServ.Addr)
|
||||||
b.ProcessContext.ComponentStarted()
|
b.ProcessContext.ComponentStarted()
|
||||||
internalServ.RegisterOnShutdown(func() {
|
internalServ.RegisterOnShutdown(func() {
|
||||||
if internalShutdown.CAS(false, true) {
|
if internalShutdown.CompareAndSwap(false, true) {
|
||||||
b.ProcessContext.ComponentFinished()
|
b.ProcessContext.ComponentFinished()
|
||||||
logrus.Infof("Stopped internal HTTP listener")
|
logrus.Infof("Stopped internal HTTP listener")
|
||||||
}
|
}
|
||||||
|
|
@ -526,7 +526,7 @@ func (b *BaseDendrite) SetupAndServeHTTP(
|
||||||
logrus.Infof("Starting external %s listener on %s", b.componentName, externalServ.Addr)
|
logrus.Infof("Starting external %s listener on %s", b.componentName, externalServ.Addr)
|
||||||
b.ProcessContext.ComponentStarted()
|
b.ProcessContext.ComponentStarted()
|
||||||
externalServ.RegisterOnShutdown(func() {
|
externalServ.RegisterOnShutdown(func() {
|
||||||
if externalShutdown.CAS(false, true) {
|
if externalShutdown.CompareAndSwap(false, true) {
|
||||||
b.ProcessContext.ComponentFinished()
|
b.ProcessContext.ComponentFinished()
|
||||||
logrus.Infof("Stopped external HTTP listener")
|
logrus.Infof("Stopped external HTTP listener")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/getsentry/sentry-go"
|
"github.com/getsentry/sentry-go"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"go.uber.org/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProcessContext struct {
|
type ProcessContext struct {
|
||||||
|
|
@ -51,7 +51,7 @@ func (b *ProcessContext) WaitForComponentsToFinish() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *ProcessContext) Degraded() {
|
func (b *ProcessContext) Degraded() {
|
||||||
if b.degraded.CAS(false, true) {
|
if b.degraded.CompareAndSwap(false, true) {
|
||||||
logrus.Warn("Dendrite is running in a degraded state")
|
logrus.Warn("Dendrite is running in a degraded state")
|
||||||
sentry.CaptureException(fmt.Errorf("Process is running in a degraded state"))
|
sentry.CaptureException(fmt.Errorf("Process is running in a degraded state"))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/internal/caching"
|
"github.com/matrix-org/dendrite/internal/caching"
|
||||||
|
|
@ -18,7 +19,6 @@ import (
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/tidwall/gjson"
|
"github.com/tidwall/gjson"
|
||||||
"go.uber.org/atomic"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue