This is the last post in our series on using Python for network analysis. In the previous posts, we used Scapy, Plotly, Pandas and more to analyze and visualize network data. All of those examples used data from packet captures (PCAP) files. In this post, we’ll cover how to observe (sniff) packets in real time and plot them using Matplotlib.
Requirements
This was tested on OSX, but will work on any Linux machine. You will need to install Scapy and Matplotlib:
#> pip3 install scapy matplotlib
|
Imports
To run in real-time, you will import Scapy and Matplotlib. We’ll import argparse to get the interface as a command line option, and os to check to see if we are root and can snoop on the specified interface:
from scapy.all import *
import matplotlib.pyplot as plt
import argparse
from os import getuid
|
Arguments
The user will supply the interface, and we will allow another option to only capture X packets before exiting:
parser = argparse.ArgumentParser(description='Live Traffic Examiner')
parser.add_argument('interface', help="Network interface", type=str)
parser.add_argument('--count', help="Capture X packets and exit", type=int)
args=parser.parse_args()
|
User Check
As mentioned above, we want to see if the user can access the interface. Usually, only root can do that, but we’ll allow a check to see if it works:
#Check to see if we are root, otherwise Scapy might not be able to listen if getuid() != 0 :
print("Warning: Not running as root, packet listening may not work.")
try:
print("--Trying to listen on {}".format(args.interface))
sniff(iface=args.interface,count=1)
print("--Success!")
except:
print("--Failed!\nError: Unable to sniff packets, try using sudo.")
quit()
|
Building the Plot
To start building the plot, we will add titles and labels to the plot. We will also use ion(), which allows for an interactive plot:
#Interactive Mode
plt.ion()
#Labels
plt.ylabel("Bytes")
plt.xlabel("Count")
plt.title("Real Time Network Traffic")
plt.tight_layout()
|
Loop
The script will have a loop that listens for one packet at a time. Here, we’ll set up an empty list to hold a count and the byte data:
#Empty list to hold bytes
yData=[]
i=0
#Listen indefinitely, or until we reach count
while True:
#Listen for 1 packet
for pkt in sniff(iface=args.interface,count=1):
|
The important thing in the above code is count=1 because it tells Scapy to sniff for a single packet. Now that we have a single packet, the code will enter a try/except loop/. The reason for try/except is to allow the user to hit CTRL-C to exit, otherwise the program will not exit easily:
try:
if IP in pkt:
yData.append(pkt[IP].len)
plt.plot(yData)
#Pause and draw
plt.pause(0.1)
i+=1
if args.count:
if i >= args.count:
quit()
|
The code above adds the size of the packet to an array, adds it to the plot, and draws it with plt.pause:
except KeyboardInterrupt:
print("Captured {} packets on interface {} ".format(i, args.interface))
quit()
|
Last, we’ll use the except for CTRL-C to exit and send a count of packets.
Usage
#> ./realtime.py -h
usage: realtime.py [-h] [--count COUNT] interface
Live Traffic Examiner
positional arguments:
interface Network interface to listen on i.e. en0
optional arguments:
-h, --help show this help message and exit
--count COUNT Capture X packets and exit
|
#> sudo ./realtime.py en0 --count 200
Password:
Capturing 200 packets on interface en0
|
For your reference, here is the full code:
from scapy.all import *
import matplotlib.pyplot as plt
import argparse
from os import getuid
parser = argparse.ArgumentParser(description='Live Traffic Examiner')
parser.add_argument('interface', help="Network interface to listen on i.e. en0", type=str)
parser.add_argument('--count', help="Capture X packets and exit", type=int)
args=parser.parse_args()
#Check to see if we are root, otherwise Scapy might not be able to listen
if getuid() != 0 :
print("Warning: Not running as root, packet listening may not work.")
try:
print("--Trying to listen on {}".format(args.interface))
sniff(iface=args.interface,count=1)
print("--Success!")
except:
print("--Failed!\nError: Unable to sniff packets, try again using sudo.")
quit()
if args.count:
print("Capturing {} packets on interface {} ".format(args.count, args.interface))
else:
print("Capturing unlimited packets on interface {} \n--Press CTRL-C to exit
".format(args.interface))
#Interactive Mode
plt.ion()
#Labels
plt.ylabel("Bytes")
plt.xlabel("Count")
plt.title("Real time Network Traffic")
plt.tight_layout()
#Empty list to hold bytes
yData=[]
i=0
#Listen indefinitely, or until we reach count
while True:
#Listen for 1 packet
for pkt in sniff(iface=args.interface,count=1):
try:
if IP in pkt:
yData.append(pkt[IP].len)
plt.plot(yData)
#Pause and draw
plt.pause(0.1)
i+=1
if args.count:
if i >= args.count:
quit()
except KeyboardInterrupt:
print("Captured {} packets on interface {} ".format(i, args.interface))
quit()
|
Conclusion
I hope this series on analyzing network traffic with Python has helped you. You can take most of the code in the examples and customize it to your specific needs. I have a feeling that once you master this you’ll use Wireshark less and less. As always, feel free to let us know if you have any questions: support@automox.com.
About Automox
Facing growing threats and a rapidly expanding attack surface, understaffed and alert-fatigued organizations need more efficient ways to eliminate their exposure to vulnerabilities. Automox is a modern cyber hygiene platform that closes the aperture of attack by more than 80% with just half the effort of traditional solutions.
Cloud-native and globally available, Automox enforces OS & third-party patch management, security configurations, and custom scripting across Windows, Mac, and Linux from a single intuitive console. IT and SecOps can quickly gain control and share visibility of on-prem, remote and virtual endpoints without the need to deploy costly infrastructure.
Experience modern, cloud-native patch management today with a 15-day free trial of Automox and start recapturing more than half the time you're currently spending on managing your attack surface. Automox dramatically reduces corporate risk while raising operational efficiency to deliver best-in-class security outcomes, faster and with fewer resources.