summaryrefslogtreecommitdiff
path: root/internal/failover
diff options
context:
space:
mode:
authorjwijenbergh <jeroenwijenbergh@protonmail.com>2022-12-20 15:35:44 +0100
committerjwijenbergh <jeroenwijenbergh@protonmail.com>2022-12-21 18:28:00 +0100
commit6981666c6d8f639a1ff9c09a3bc08769e19928af (patch)
treebdb94d76a7fb6a08ef200e9bbbbd5fff1d6b134c /internal/failover
parent697dfed1f9f5d2916889a81a7a64bd1158caf2d2 (diff)
Failover: Initial implementation
Diffstat (limited to 'internal/failover')
-rw-r--r--internal/failover/failover.go21
-rw-r--r--internal/failover/monitor.go106
-rw-r--r--internal/failover/ping.go51
3 files changed, 178 insertions, 0 deletions
diff --git a/internal/failover/failover.go b/internal/failover/failover.go
new file mode 100644
index 0000000..f239eeb
--- /dev/null
+++ b/internal/failover/failover.go
@@ -0,0 +1,21 @@
+package failover
+
+import "time"
+
+const (
+ // Send a ping every 2 seconds to the gateway
+ pInterval time.Duration = 2 * time.Second
+
+ // pAlive is how many pings we need to have sent to check if the connection is alive
+ pAlive int = 3
+
+ // pDropped is how many pings we need to have sent to check if the connection is dropped
+ pDropped int = 5
+)
+
+// New creates a failover monitor for the gateway and the rx bytes function reader
+// This is a simple wrapper over `NewDroppedMonitor` to create one with the default settings
+// If this function returns True, the connection is dropped. False means it has exited and we don't know for sure if it's dropped or not
+func New(readRxBytes func() (int64, error)) (*DroppedConMon, error) {
+ return NewDroppedMonitor(pInterval, pAlive, pDropped, readRxBytes)
+}
diff --git a/internal/failover/monitor.go b/internal/failover/monitor.go
new file mode 100644
index 0000000..d14fb9e
--- /dev/null
+++ b/internal/failover/monitor.go
@@ -0,0 +1,106 @@
+package failover
+
+import (
+ "context"
+ "time"
+
+ "github.com/go-errors/errors"
+)
+
+// The DroppedConMon is a connection monitor that checks for an increase in rx bytes in certain intervals
+type DroppedConMon struct {
+ // pInterval means how the interval in which to send pings
+ pInterval time.Duration
+ // pAlive means how many pings need to be send before checking if the connection is alive
+ pAlive int
+ // pDropped means how many pings need to be send before checking if the connection is dropped
+ pDropped int
+ // The function that reads Rx bytes
+ // If this function returns an error, the monitor exits
+ readRxBytes func() (int64, error)
+ // The cancel context
+ // This is used to cancel the dropped connection monitor
+ cancel context.CancelFunc
+}
+
+func NewDroppedMonitor(pingInterval time.Duration, pAlive int, pDropped int, readRxBytes func() (int64, error)) (*DroppedConMon, error) {
+ if pAlive >= pDropped {
+ return nil, errors.New("pAlive must be smaller than pDropped")
+ }
+ return &DroppedConMon{pInterval: pingInterval, pAlive: pAlive, pDropped: pDropped, readRxBytes: readRxBytes}, nil
+}
+
+// Dropped checks whether or not the connection is 'dropped'
+// In other words, it checks if rx bytes has increased
+func (m *DroppedConMon) dropped(startBytes int64) (bool, error) {
+ b, err := m.readRxBytes()
+ if err != nil {
+ return false, err
+ }
+ return b <= startBytes, nil
+}
+
+// Start starts ticking every ping interval and check if the connection is dropped or alive
+// This does not check Rx bytes every tick, but rather when pAlive or pDropped is reached
+// It returns an error if there was an invalid input or a ping was failed to be sent
+func (m *DroppedConMon) Start(gateway string, mtuSize int) (bool, error) {
+ if mtuSize <= 0 {
+ return false, errors.New("invalid mtu size given")
+ }
+
+ // Create a context and save the cancel function
+ ctx, cancel := context.WithCancel(context.Background())
+ m.cancel = cancel
+ defer m.cancel()
+
+ // Create a ping struct with our mtu size
+ p, err := NewPinger(mtuSize)
+ if err != nil {
+ return false, err
+ }
+
+ // Read the start Rx bytes
+ b, err := m.readRxBytes()
+ if err != nil {
+ return false, err
+ }
+
+ // Create a new ticker that executes our ping function every 'interval' seconds
+ // It starts immediately and stops when we reach the end
+ ticker := time.NewTicker(m.pInterval)
+ defer ticker.Stop()
+
+ // Loop until the max drop counter
+ // We begin with 1 as this is used as the sequence number for ping
+ for s := 1; s <= m.pDropped; s++ {
+ // Send a ping and return if an error occurs
+ if err := p.Send(gateway, s); err != nil {
+ return false, err
+ }
+
+ // Early alive check
+ // If not dropped, return
+ if s == m.pAlive {
+ if d, err := m.dropped(b); !d {
+ return false, err
+ }
+ }
+ // Wait for the next tick to continue
+ select {
+ case <-ticker.C:
+ continue
+ case <-ctx.Done():
+ return false, errors.New("failover was cancelled")
+ }
+ }
+
+ // Dropped check if we have not returned early
+ return m.dropped(b)
+}
+
+// Cancel cancels the dropped connection failover monitor if there is one
+func (m *DroppedConMon) Cancel() {
+ if m.cancel != nil {
+ m.cancel()
+ }
+}
diff --git a/internal/failover/ping.go b/internal/failover/ping.go
new file mode 100644
index 0000000..2ffedd4
--- /dev/null
+++ b/internal/failover/ping.go
@@ -0,0 +1,51 @@
+package failover
+
+import (
+ "fmt"
+ "net"
+ "os"
+
+ "golang.org/x/net/icmp"
+ "golang.org/x/net/ipv4"
+
+ "github.com/go-errors/errors"
+)
+
+// mtuOverhead defines the total MTU overhead for an ICMP ECHO message: 20 bytes IP header + 8 bytes ICMP header
+var mtuOverhead = 28
+
+type Pinger struct {
+ listener net.PacketConn
+ buffer []byte
+}
+
+func NewPinger(size int) (*Pinger, error) {
+ l, err := icmp.ListenPacket("udp4", "0.0.0.0")
+ if err != nil {
+ return nil, errors.WrapPrefix(err, "failed creating ping", 0)
+ }
+ return &Pinger{listener: l, buffer: make([]byte, size-mtuOverhead)}, nil
+}
+
+func (p Pinger) Send(gateway string, seq int) error {
+ errorMessage := fmt.Sprintf("failed sending ping, seq %d", seq)
+ // Make a new ICMP message
+ m := icmp.Message{
+ Type: ipv4.ICMPTypeEcho, Code: 0,
+ Body: &icmp.Echo{
+ ID: os.Getpid() & 0xffff, Seq: seq,
+ Data: p.buffer,
+ },
+ }
+ // Marshal the message to bytes
+ b, err := m.Marshal(nil)
+ if err != nil {
+ return errors.WrapPrefix(err, errorMessage, 0)
+ }
+ // And send it to the gateway IP!
+ _, err = p.listener.WriteTo(b, &net.UDPAddr{IP: net.ParseIP(gateway)})
+ if err != nil {
+ return errors.WrapPrefix(err, errorMessage, 0)
+ }
+ return nil
+}