1
1
package collectors
2
2
3
3
import (
4
+ "context"
4
5
"github.com/google/gopacket"
5
6
"github.com/google/gopacket/layers"
6
7
"github.com/google/gopacket/pcap"
@@ -9,6 +10,8 @@ import (
9
10
"github.com/otterize/network-mapper/src/sniffer/pkg/ipresolver"
10
11
"github.com/sirupsen/logrus"
11
12
"github.com/spf13/viper"
13
+ "net"
14
+ "sync"
12
15
"time"
13
16
)
14
17
@@ -39,18 +42,82 @@ func NewDNSSniffer(resolver ipresolver.IPResolver) *DNSSniffer {
39
42
return & s
40
43
}
41
44
42
- func (s * DNSSniffer ) CreateDNSPacketStream () (chan gopacket.Packet , error ) {
43
- handle , err := pcap .OpenLive ("any" , 0 , true , pcap .BlockForever )
45
+ type PacketChannelCombiner struct {
46
+ Channels []chan gopacket.Packet
47
+ combined chan gopacket.Packet
48
+ combinedOnce sync.Once
49
+ }
50
+
51
+ func NewPacketChannelCombiner (channels []chan gopacket.Packet ) * PacketChannelCombiner {
52
+ return & PacketChannelCombiner {
53
+ Channels : channels ,
54
+ }
55
+ }
56
+
57
+ func (p * PacketChannelCombiner ) Packets () chan gopacket.Packet {
58
+ p .combinedOnce .Do (func () {
59
+ p .combined = make (chan gopacket.Packet )
60
+ for _ , c := range p .Channels {
61
+ go func (channel chan gopacket.Packet ) {
62
+ for packet := range channel {
63
+ p .combined <- packet
64
+ }
65
+ }(c )
66
+ }
67
+ })
68
+ return p .combined
69
+ }
70
+
71
+ func (s * DNSSniffer ) CreatePacketChannelForInterface (iface net.Interface ) (result chan gopacket.Packet , err error ) {
72
+ doneCtx , cancel := context .WithTimeout (context .Background (), 2 * time .Second )
73
+ defer cancel ()
74
+ go func () {
75
+ defer cancel ()
76
+ handle , openLiveErr := pcap .OpenLive (iface .Name , 0 , true , pcap .BlockForever )
77
+ if openLiveErr != nil {
78
+ err = errors .Wrap (openLiveErr )
79
+ return
80
+ }
81
+ bpfErr := handle .SetBPFFilter ("udp port 53" )
82
+ if bpfErr != nil {
83
+ err = errors .Wrap (bpfErr )
84
+ return
85
+ }
86
+
87
+ packetSource := gopacket .NewPacketSource (handle , handle .LinkType ())
88
+ result = packetSource .Packets ()
89
+ return
90
+ }()
91
+ <- doneCtx .Done ()
92
+ if errors .Is (doneCtx .Err (), context .DeadlineExceeded ) {
93
+ return nil , errors .Errorf ("timed out starting capture on interface '%s': %w" , iface .Name , doneCtx .Err ())
94
+ }
44
95
if err != nil {
45
- return nil , errors .Wrap ( err )
96
+ return nil , errors .Errorf ( "failed to start capture on interface '%s': %w" , iface . Name , err )
46
97
}
47
- err = handle .SetBPFFilter ("udp port 53" )
98
+ return result , nil
99
+ }
100
+
101
+ func (s * DNSSniffer ) CreateDNSPacketStream () (chan gopacket.Packet , error ) {
102
+ interfaceList , err := net .Interfaces ()
48
103
if err != nil {
49
104
return nil , errors .Wrap (err )
50
105
}
51
106
52
- packetSource := gopacket .NewPacketSource (handle , handle .LinkType ())
53
- return packetSource .Packets (), nil
107
+ packetChans := make ([]chan gopacket.Packet , 0 )
108
+ for _ , iface := range interfaceList {
109
+ logrus .Debugf ("Starting capture on interface '%s'" , iface .Name )
110
+ packetChannel , err := s .CreatePacketChannelForInterface (iface )
111
+ if err != nil {
112
+ logrus .WithError (err ).Errorf ("failed to open packet channel for interface '%s', skipping" , iface .Name )
113
+ continue
114
+ }
115
+ packetChans = append (packetChans , packetChannel )
116
+ }
117
+ if len (packetChans ) == 0 {
118
+ return nil , errors .New ("no captures opened successfully" )
119
+ }
120
+ return NewPacketChannelCombiner (packetChans ).Packets (), nil
54
121
}
55
122
56
123
func (s * DNSSniffer ) HandlePacket (packet gopacket.Packet ) {
0 commit comments