blob: 4fb4f8ed008325932af588db0cbea540bf6b3749 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
package discovery
import (
"context"
"log/slog"
"sync"
)
// Manager is the discovery struct that is cached
// with some bookkeeping to avoid race conditions
type Manager struct {
disco *Discovery
cancel context.CancelFunc
mu sync.RWMutex
wait sync.WaitGroup
}
// NewManager creates a new Discovery manager
func NewManager(disco *Discovery) *Manager {
return &Manager{disco: disco}
}
func (m *Manager) lock(write bool) {
if write {
m.mu.Lock()
return
}
m.mu.RLock()
}
func (m *Manager) unlock(write bool) {
if write {
m.mu.Unlock()
return
}
m.mu.RUnlock()
}
// Discovery gets the cached discovery
// `write` is true if discovery will be written to
func (m *Manager) Discovery(write bool) (*Discovery, func()) {
if write {
m.wait.Wait()
}
m.lock(write)
return m.disco, func() {
m.unlock(write)
}
}
// Cancel aborts the running discovery startup process
func (m *Manager) Cancel() {
if m.cancel != nil {
m.cancel()
}
m.wait.Wait()
}
// Startup handles the discovery process in the background
// It's called Startup because it's called when the lib is initialised
func (m *Manager) Startup(ctx context.Context, cb func()) {
ctx, cancel := context.WithCancel(ctx)
m.cancel = cancel
m.wait.Add(1)
go func() {
m.lock(false)
discoCopy, err := m.disco.Copy()
if err != nil {
slog.Warn("failed to clone discovery", "error", err)
return
}
m.unlock(false)
// we already log the warning
discoCopy.Servers(ctx, false) //nolint:errcheck
m.lock(true)
m.disco.UpdateServers(discoCopy.ServerList)
m.unlock(true)
m.wait.Done()
select {
case <-ctx.Done():
return
default:
if cb == nil {
return
}
cb()
}
}()
}
|