Otto background

Visualizing Network Data in Real Time with Python

This is the last post in our series on using Python for network analysis.

In the previous posts, we used Scapy, Plotly, Pandas and more to analyze and visualize network data. All of those examples used data from packet captures (PCAP) files.

In this post, we’ll cover how to observe (sniff) packets in real time and plot them using Matplotlib.

Requirements

This was tested on OSX, but will work on any Linux machine. You will need to install Scapy and Matplotlib:

#> pip3 install scapy matplotlib

 

Imports

To run in real-time, you will import Scapy and Matplotlib. We’ll import argparse to get the interface as a command line option, and os to check to see if we are root and can snoop on the specified interface:

from scapy.all import *

 

import matplotlib.pyplot as plt

 

import argparse

 

from os import getuid

 

Arguments

The user will supply the interface, and we will allow another option to only capture X packets before exiting:

parser = argparse.ArgumentParser(description='Live Traffic Examiner')

 

parser.add_argument('interface', help="Network interface", type=str)

 

parser.add_argument('--count', help="Capture X packets and exit", type=int)

 

args=parser.parse_args()

 

User Check

As mentioned above, we want to see if the user can access the interface. Usually, only root can do that, but we’ll allow a check to see if it works:

#Check to see if we are root, otherwise Scapy might not be able to listen

if getuid() != 0 :

 

   print("Warning: Not running as root, packet listening may not work.")

 

   try:

 

       print("--Trying to listen on {}".format(args.interface))

 

       sniff(iface=args.interface,count=1)

 

       print("--Success!")

 

   except:

 

       print("--Failed!\nError: Unable to sniff packets, try using sudo.")

 

       quit()

 

Building the plot

To start building the plot, we will add titles and labels to the plot. We will also use ion(), which allows for an interactive plot:

#Interactive Mode

 

plt.ion()

 

#Labels

 

plt.ylabel("Bytes")

 

plt.xlabel("Count")

 

plt.title("Real Time Network Traffic")

 

plt.tight_layout()

 

Loop

The script will have a loop that listens for one packet at a time. Here, we’ll set up an empty list to hold a count and the byte data:

#Empty list to hold bytes

 

yData=[]

 

i=0

 

#Listen indefinitely, or until we reach count

 

while True:

 

   #Listen for 1 packet

 

   for pkt in sniff(iface=args.interface,count=1):

The important thing in the above code is count=1 because it tells Scapy to sniff for a single packet. Now that we have a single packet, the code will enter a try/except loop/. The reason for try/except is to allow the user to hit CTRL-C to exit, otherwise the program will not exit easily:

       try:

 

           if IP in pkt:

 

               yData.append(pkt[IP].len)

 

               plt.plot(yData)

 

               #Pause and draw

 

               plt.pause(0.1)

 

               i+=1

 

               if args.count:

 

                   if i >= args.count:

 

                       quit()

The code above adds the size of the packet to an array, adds it to the plot, and draws it with plt.pause:

except KeyboardInterrupt:

 

  print("Captured {} packets on interface {} ".format(i, args.interface))

 

  quit()

Last, we’ll use the except for CTRL-C to exit and send a count of packets.

Usage

#> ./realtime.py  -h

 

usage: realtime.py [-h] [--count COUNT] interface

 

Live Traffic Examiner

 

positional arguments:

 

 interface      Network interface to listen on i.e. en0

 

optional arguments:

 

 -h, --help     show this help message and exit

 

 --count COUNT  Capture X packets and exit

#> sudo ./realtime.py en0 --count 200

 

Password:

 

Capturing 200 packets on interface en0

For your reference, here is the full code:

from scapy.all import *

 

import matplotlib.pyplot as plt

 

import argparse

 

from os import getuid

 

parser = argparse.ArgumentParser(description='Live Traffic Examiner')

 

parser.add_argument('interface', help="Network interface to listen on i.e. en0", type=str)

 

parser.add_argument('--count', help="Capture X packets and exit", type=int)

 

args=parser.parse_args()

 

#Check to see if we are root, otherwise Scapy might not be able to listen

 

if getuid() != 0 :

 

   print("Warning: Not running as root, packet listening may not work.")

 

   try:

 

       print("--Trying to listen on {}".format(args.interface))

 

       sniff(iface=args.interface,count=1)

 

       print("--Success!")

 

   except:

 

       print("--Failed!\nError: Unable to sniff packets, try again using sudo.")

 

       quit()

 

if args.count:

 

   print("Capturing {} packets on interface {} ".format(args.count, args.interface))

 

else:

 

   print("Capturing unlimited packets on interface {} \n--Press CTRL-C to exit

 

".format(args.interface))

 

#Interactive Mode

 

plt.ion()

 

#Labels

 

plt.ylabel("Bytes")

 

plt.xlabel("Count")

 

plt.title("Real time Network Traffic")

 

plt.tight_layout()

 

#Empty list to hold bytes

 

yData=[]

 

i=0

 

#Listen indefinitely, or until we reach count

 

while True:

 

   #Listen for 1 packet

 

   for pkt in sniff(iface=args.interface,count=1):

 

       try:

 

           if IP in pkt:

 

               yData.append(pkt[IP].len)

 

               plt.plot(yData)

 

               #Pause and draw

 

               plt.pause(0.1)

 

               i+=1

 

               if args.count:

 

                   if i >= args.count:

 

                       quit()

 

       except KeyboardInterrupt:

 

           print("Captured {} packets on interface {} ".format(i, args.interface))

 

           quit()

 

Visualizing network data in real time with Python takeaway

I hope this series on analyzing network traffic with Python has helped you. You can take most of the code in the examples and customize it to your specific needs. I have a feeling that once you master this you’ll use Wireshark less and less.


Automox for Easy IT Operations

Automox is the cloud-native IT operations platform for modern organizations. It makes it easy to keep every endpoint automatically configured, patched, and secured – anywhere in the world. With the push of a button, IT admins can fix critical vulnerabilities faster, slash cost and complexity, and win back hours in their day. 

Grab your free trial of Automox and join thousands of companies transforming IT operations into a strategic business driver.

Dive deeper into this topic

loading...