From 05baf0577325abd7528efbfd878e217113c0c942 Mon Sep 17 00:00:00 2001 From: Evgeniy Makeev Date: Wed, 28 Sep 2022 17:21:59 -0700 Subject: [PATCH] Flexible ping interval option (allows to send next ping packet as soon as the previous packet echo answer is received) Signed-off-by: Evgeniy Makeev --- ping.go | 58 +++++++++++++++++++++++++------ ping_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 139 insertions(+), 15 deletions(-) diff --git a/ping.go b/ping.go index 002502b..b275235 100644 --- a/ping.go +++ b/ping.go @@ -131,6 +131,9 @@ type Pinger struct { // interrupted. Count int + // FlexibleInterval flag tells pinger to send next packet as soon as it + // receives echo response to a previous packet + FlexibleInterval bool // Debug runs in debug mode Debug bool @@ -466,7 +469,8 @@ func (p *Pinger) runLoop( timeout.Stop() }() - if err := p.sendICMP(conn); err != nil { + err := p.sendICMP(conn) + if err != nil { return err } @@ -479,21 +483,32 @@ func (p *Pinger) runLoop( return nil case r := <-recvCh: - err := p.processPacket(r) + err = p.processPacket(r) if err != nil { - // FIXME: this logs as FATAL but continues - logger.Fatalf("processing received packet: %s", err) + if nce, nonCritical := err.(*NonCriticalError); !nonCritical { + logger.Errorf("processing received packet: %v", err) + } else { + logger.Debugf("%v for address: %s", nce, p.ipaddr) + } + continue + } + if p.FlexibleInterval { + interval.Reset(p.Interval) + if p.Count <= 0 || p.PacketsSent < p.Count { + err = p.sendICMP(conn) + if err != nil { + logger.Errorf("sending packet: %v", err) + } + } } - case <-interval.C: if p.Count > 0 && p.PacketsSent >= p.Count { interval.Stop() continue } - err := p.sendICMP(conn) + err = p.sendICMP(conn) if err != nil { - // FIXME: this logs as FATAL but continues - logger.Fatalf("sending packet: %s", err) + logger.Errorf("sending packet: %v", err) } } if p.Count > 0 && p.PacketsRecv >= p.Count { @@ -642,6 +657,24 @@ func (p *Pinger) getCurrentTrackerUUID() uuid.UUID { return p.trackerUUIDs[len(p.trackerUUIDs)-1] } +// NonCriticalError is a class of recoverable/non-critical errors +type NonCriticalError struct { + string +} + +func (e *NonCriticalError) Error() string { + if e == nil { + return "" + } + return e.string +} + +var ( + notEchoPacket = &NonCriticalError{"not an echo packet"} + mismatchedEchoPacketId = &NonCriticalError{"mismatched echo packet ID"} + duplicateEchoPacket = &NonCriticalError{"duplicate echo packet ID"} +) + func (p *Pinger) processPacket(recv *packet) error { receivedAt := time.Now() var proto int @@ -659,7 +692,7 @@ func (p *Pinger) processPacket(recv *packet) error { if m.Type != ipv4.ICMPTypeEchoReply && m.Type != ipv6.ICMPTypeEchoReply { // Not an echo reply, ignore it - return nil + return notEchoPacket } inPkt := &Packet{ @@ -673,7 +706,7 @@ func (p *Pinger) processPacket(recv *packet) error { switch pkt := m.Body.(type) { case *icmp.Echo: if !p.matchID(pkt.ID) { - return nil + return mismatchedEchoPacketId } if len(pkt.Data) < timeSliceLength+trackerLength { @@ -691,11 +724,14 @@ func (p *Pinger) processPacket(recv *packet) error { inPkt.Seq = pkt.Seq // If we've already received this sequence, ignore it. if _, inflight := p.awaitingSequences[*pktUUID][pkt.Seq]; !inflight { + p.statsMu.Lock() p.PacketsRecvDuplicates++ + p.statsMu.Unlock() + if p.OnDuplicateRecv != nil { p.OnDuplicateRecv(inPkt) } - return nil + return duplicateEchoPacket } // remove it from the list of sequences we're waiting for so we don't get duplicates. delete(p.awaitingSequences[*pktUUID], pkt.Seq) diff --git a/ping_test.go b/ping_test.go index b8755e7..0009419 100644 --- a/ping_test.go +++ b/ping_test.go @@ -97,7 +97,7 @@ func TestProcessPacket_IgnoreNonEchoReplies(t *testing.T) { } err = pinger.processPacket(&pkt) - AssertNoError(t, err) + AssertTrue(t, err == notEchoPacket) AssertTrue(t, shouldBe0 == 0) } @@ -140,7 +140,7 @@ func TestProcessPacket_IDMismatch(t *testing.T) { } err = pinger.processPacket(&pkt) - AssertNoError(t, err) + AssertTrue(t, err == mismatchedEchoPacketId) AssertTrue(t, shouldBe0 == 0) } @@ -220,7 +220,7 @@ func TestProcessPacket_LargePacket(t *testing.T) { } err = pinger.processPacket(&pkt) - AssertNoError(t, err) + AssertTrue(t, err == duplicateEchoPacket) } func TestProcessPacket_PacketTooSmall(t *testing.T) { @@ -628,7 +628,7 @@ func TestProcessPacket_IgnoresDuplicateSequence(t *testing.T) { AssertNoError(t, err) // receive a duplicate err = pinger.processPacket(&pkt) - AssertNoError(t, err) + AssertTrue(t, err == duplicateEchoPacket) AssertTrue(t, shouldBe0 == 1) AssertTrue(t, dups == 1) @@ -763,3 +763,91 @@ func TestRunOK(t *testing.T) { AssertTrue(t, stats.MinRtt >= 10*time.Millisecond) AssertTrue(t, stats.MinRtt <= 12*time.Millisecond) } + +type testPacketConnNoDelay struct { + testPacketConn + writeDone int32 + buf []byte + dst net.Addr +} + +func (c *testPacketConnNoDelay) WriteTo(b []byte, dst net.Addr) (int, error) { + c.buf = make([]byte, len(b)) + c.dst = dst + n := copy(c.buf, b) + atomic.StoreInt32(&c.writeDone, 1) + return n, nil +} + +func (c *testPacketConnNoDelay) ReadFrom(b []byte) (n int, ttl int, src net.Addr, err error) { + if atomic.LoadInt32(&c.writeDone) == 0 { + time.Sleep(1 * time.Millisecond) + return 0, 0, nil, nil + } + atomic.StoreInt32(&c.writeDone, 0) + msg, err := icmp.ParseMessage(ipv4.ICMPTypeEcho.Protocol(), c.buf) + if err != nil { + return 0, 0, nil, err + } + msg.Type = ipv4.ICMPTypeEchoReply + buf, err := msg.Marshal(nil) + if err != nil { + return 0, 0, nil, err + } + n = copy(b, buf) + time.Sleep(1 * time.Millisecond) + atomic.StoreInt32(&c.writeDone, 0) + return n, 64, c.dst, nil +} + +func TestFlexibleInterval(t *testing.T) { + // with flexible interval + pinger := New("10.20.30.40") + pinger.Count = 3 + pinger.FlexibleInterval = true + pinger.Interval = 10 * time.Millisecond + err := pinger.Resolve() + AssertNoError(t, err) + + conn := new(testPacketConnNoDelay) + + start := time.Now() + err = pinger.run(conn) + runDuration := time.Now().Sub(start) + AssertTrue(t, err == nil) + + stats := pinger.Statistics() + AssertTrue(t, stats != nil) + if stats == nil { + t.FailNow() + } + AssertTrue(t, stats.PacketsSent == 3) + AssertTrue(t, stats.PacketsRecv == 3) + AssertTrue(t, stats.MinRtt > 0) + AssertTrue(t, runDuration < 20*time.Millisecond) + + // without flexible interval + pinger = New("10.21.33.44") + pinger.Count = 3 + pinger.Interval = 10 * time.Millisecond + err = pinger.Resolve() + AssertNoError(t, err) + + conn = new(testPacketConnNoDelay) + + start = time.Now() + err = pinger.run(conn) + runDuration = time.Now().Sub(start) + + AssertTrue(t, err == nil) + + stats = pinger.Statistics() + AssertTrue(t, stats != nil) + if stats == nil { + t.FailNow() + } + AssertTrue(t, stats.PacketsSent == 3) + AssertTrue(t, stats.PacketsRecv == 3) + AssertTrue(t, runDuration >= 20*time.Millisecond) + AssertTrue(t, runDuration < 40*time.Millisecond) +}