Dark mode switch icon Light mode switch icon

Automating Firewall Testing on Linux

16 min read

A while ago my work team was assigned the task to migrate one firewall generator into a different one. Output were iptables rules for Linux servers in both cases. As we wanted to ensure a smooth transition, we were looking for a way to actually test the new rulesets before being deployed into production. A little bit of googling did not yield any results, so we came up with our own solution.

The same approach should work when transitioning from iptables to nftables but we have not tested that. In our scenario the firewall systems were acting as gateways for layer 3 segments (vlans) with multiple layer 3 upstream interfaces, roughly with the following topology:

Routing Firewall Topology

This blog post will explain the overall methology but it does not come with ready-to-use code. You have to figure that last part out yourself and adapt it to your infrastructure :-)

The Secret Sauce

Contrary to what you might believe, we will not be using any bleeding edge technology here. Everything that is required has been around for ages:

What Are Network Namespaces?

The concept of network namespaces was introduced back in 2002 with Linux Kernel 2.4.19. It allows for a complete separation of the Linux network stack - interfaces can only be present in one network namespace and will not be visible/usable from any other namespace. Essentially it is the foundation of all modern container networking. Each network namespace has its own interfaces, routing tables, firewall rules, connection tracking and sockets/listeners. Linux also supports VRFs, which is a different concept and only cares about a separation of layer 3 / routing information and can not be used for isolation / security purposes.

Linux uses the iproute2 package to create/manage network namespaces from the command line, more specific the ip netns command. Most important commands are:

What Are veth Interfaces?

veth are virtual ethernet devices which can be used to connect two namespaces to each other. They always come in pairs with both ends residing in different namespaces, the following example is directly taken from the veth man page:

ip link add <p1-name> netns <p1-ns> type veth peer <p2-name> netns <p2-ns>

We will use these devices to allow packets to flow from one namespace to the next.

A Word on Python and Scapy

In our scenario we have used Python and the Scapy library, mainly because we already knew Python. Scapy is an easy way to work with PCAPs and packet generation in Python. However, neither Python nor Scapy are exactly fast in what they are doing. We worked around those issues by using Python multiprocessing which gave us a reasonable improvement both for parsing PCAPs and also for generating test traffic.

If you need more performance and you are more of a Go(lang) person, there are libraries for PCAP reading and packet generation, but I do not have any experience with those.

From A Bird’s Eye View

In essence we will be doing the following:

Capture Allowed Traffic & Gather Test Data

To test our new firewall ruleset, we need test data. Ideally both for traffic that must pass the firewall but also traffic which must be blocked. If you skip the latter you may actually overlook that your shiny new firewall simply allows any packets to pass :-)

We will be using tcpdump to gather data on traffic flows that are actually allowed to pass the firewall. For this to work we need to dump traffic after it has been processed by the firewall. If we stick with the example from above, traffic entering vlan 100 (198.51.100.0/26) must be captured on interface bond0.100 while traffic leaving this vlan must be captured on both upstream interfaces eth0 and eth1.

To keep the amount of data stored on disk to a minimum (and thus allow for dumping data for longer periods of time) we can use some tricks:

YMMV entirely, depending on the types of traffic in your network you may need to broaden or narrow your filters. Also, we are mostly running Debian Linux and we know that other Distributions/OSes have lower limits for source ports than 15.000. You might need to adapt your port filter to filter out further UDP reply traffic.

Following are two examples how to gather traffic for our example vlan 100 in both directions:

# incoming
export INTERFACE=bond0.100
export NETWORK=198.51.100.0/26
sudo tcpdump -s 44 -ni $INTERFACE -w incoming_${INTERFACE}_traffic.pcap -C 50 -W 20 "dst net $NETWORK and ((tcp[tcpflags] & (tcp-syn) != 0 and tcp[tcpflags] & (tcp-ack) == 0) or (udp dst portrange 1-9999))"

# outgoing
export INTERFACE=eth0
export NETWORK=198.51.100.0/26
sudo tcpdump -s 44 -ni eth0 -w outgoing_${INTERFACE}_traffic.pcap -C 50 -W 20 "src net $NETWORK and ((tcp[tcpflags] & (tcp-syn) != 0 and tcp[tcpflags] & (tcp-ack) == 0) or (udp dst portrange 1-9999))"

export INTERFACE=eth1
export NETWORK=198.51.100.0/26
sudo tcpdump -s 44 -ni eth1 -w outgoing_${INTERFACE}_traffic.pcap -C 50 -W 20 "src net $NETWORK and ((tcp[tcpflags] & (tcp-syn) != 0 and tcp[tcpflags] & (tcp-ack) == 0) or (udp dst portrange 1-9999))"

These PCAPs will now contain lots of duplicated flows (e.g. DNS requests, short lived HTTP sessions etc.) which will only hog resources and extend testing time without providing any real value. Consider that your firewall ruleset might not work on the first attempt and you need to fix stuff and re-run the tests over and over again. This is where the PCAP-reading capability of Scapy comes into play: we can analyze the aquired PCAPs, remove duplicate traffic and store the resulting data in a format that will be easier to parse (e.g. CSV) for the actual test runs.

The following Python script will analyse all given PCAP files (using Python multiprocessing to parse in parallel), create a CSV file as its output, recognize UDP, TCP, SCTP and GRE flows, eleminating duplicates on the way:

#!/usr/bin/env python3

import argparse
import csv
from multiprocessing import Pool

from scapy.all import *

# configure enabled layers for dissecting
conf.layers.filter([Ether, IP, ICMP, TCP, UDP, SCTP, GRE])
# configure/overwrite payload guessing for specific protocols
Ether.payload_guess = [({"type": 0x800}, IP)]
TCP.payload_guess = []
UDP.payload_guess = []


def main():
    parser = argparse.ArgumentParser(description='Prepare Input for nqfilter fw testing.')
    parser.add_argument('--input', '-i', help='List of pcap files', nargs='+', default=[])
    parser.add_argument('--output', '-o', help='Output file name', default='prepared_input.csv')

    args = parser.parse_args()

    with Pool() as pool:
        print("starting workers")
        parsed_pcaps = pool.map(parse_pcap, args.input)

    print("All workers finished, writing csv file")
    used_flows = set()
    start_time = time.time()
    written_rows = 0
    with open(args.output, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=';')
        writer.writerow(['connection_type', 'src_ip', 'src_port', 'dst_ip', 'dst_port'])
        for data in parsed_pcaps:
            for row in data:
                flow_tuple = (row["proto"], row["src_ip"], row["dst_ip"], row["dst_port"])
                if flow_tuple not in used_flows:
                    written_rows += 1
                    used_flows.add(flow_tuple)
                    writer.writerow([row["proto"], row["src_ip"], row["src_port"], row["dst_ip"], row["dst_port"]])

    end_time = time.time()
    duration = end_time - start_time
    print("Wrote %d lines in %ds" % (written_rows, duration))


def parse_pcap(file):
    used_flows = set()
    return_flows = []
    processed_packets = 0
    start_time = time.time()

    with PcapReader(file) as packets:
        for packet in packets:
            processed_packets += 1
            if packet.haslayer(IP):
                src_ip = packet[IP].src
                dst_ip = packet[IP].dst
                if packet.haslayer(UDP):
                    layer4_proto = UDP
                    proto_string = 'UDP'
                elif packet.haslayer(TCP):
                    layer4_proto = TCP
                    proto_string = 'TCP'
                elif packet.haslayer(SCTP):
                    layer4_proto = SCTP
                    proto_string = 'SCTP'
                elif packet.haslayer(GRE):
                    layer4_proto = GRE
                    proto_string = 'GRE'
                else:
                    continue

                src_port = packet[layer4_proto].sport
                dst_port = packet[layer4_proto].dport
                flow_tuple = (layer4_proto, src_ip, dst_ip, dst_port)

                if flow_tuple not in used_flows:
                    used_flows.add(flow_tuple)
                    return_flows.append({
                        "proto": proto_string,
                        "src_ip": src_ip,
                        "src_port": src_port,
                        "dst_ip": dst_ip,
                        "dst_port": dst_port})

    end_time = time.time()
    duration = end_time - start_time
    print("Processed %d packets in %ds" % (processed_packets, duration))
    return return_flows


if __name__ == "__main__":
    main()

It would be invoked as follows (with the PCAPs in the same folder):

EXPORT=bond0.100
./prepare_input.py -i incoming_${INTERFACE}_traffic.pcap* -o incoming_${INTERFACE}_traffic.csv
./prepare_input.py -i outgoing_${INTERFACE}_traffic.pcap* -o outgoing_${INTERFACE}_traffic.csv

This will provide you with two CSV files containing the fields: protocol, source IP, destination IP, source port, destination port.

Capture Blocked Traffic & Gather Test Data

Our firewall systems use ulogd to log dropped packages. It provides us with a logfile (say /var/log/firewall/firewall.log) with lines similar to that:

May 20 09:00:00 fancy-firewall IN=eth0 OUT=bond0.100 MAC=aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:00:00 SRC=1.2.3.4 DST=192.51.100.88 LEN=60 TOS=00 PREC=0x00 TTL=48 ID=16114 PROTO=TCP SPT=17499 DPT=443 SEQ=2838500987 ACK=0 WINDOW=42340 SYN URGP=0 MARK=0 

We can build a simple parser script with e.g. string splitting or regular expressions, find the lines that concern our network (either in DST or SRC) and build a CSV file similar to the prepare_input.py script above. This way we get a list of packets which have been blocked by our existing firewall (either entering or leaving the network).

Setup The Namespaces

The general idea of the namespaces is that packets are generated inside the sender namespace. There is only one interface veth0, which connects this namespace to the firewall namespace. A default route in the sender namespace makes sure that each and every packet generated actually gets forwarded to the firewall namespace. That in turn is connected to the receiver namespace through veth2, again with a default route pointed to the receiver namespace. This way any packets generated in the sender namespace will inevitably end up in the receiver namespace unless there are firewall rules active in the firewall namespace which block forwarding.

Firewall Namespace Layout

The following shell script will set up the actual namespace environment (and also cleanup existing ones of the same name beforehand):

#!/bin/bash

set -e

if [ "$UID" -ne 0 ]; then
    echo "Please run as root!"
    exit 1
fi

insert_firewall_rules() {
    # add whatever is required to load your firewalls rules inside the 
    # firewall namespace, e.g.:

	  # cat firewall-rules | ip netns exec firewall iptables-restore
    # ip netns exec firewall /path/to/firewall-script.sh
}

exec_ns() {
    ip netns exec $*
}

cleanup() {
    for netns in sender firewall receiver; do
        ip netns del ${netns} || true
    done
}



cleanup

# enable ip forward
echo 1 > /proc/sys/net/ipv4/ip_forward

# setup sender namespaces
ip netns add sender
ip netns add firewall
ip netns add receiver

# setup veth pairs
ip link add veth0 netns sender   type veth peer veth1 netns firewall
ip link add veth2 netns firewall type veth peer veth3 netns receiver

# enable veth interfaces
exec_ns sender   ip link set up dev veth0
exec_ns firewall ip link set up dev veth1
exec_ns firewall ip link set up dev veth2
exec_ns receiver ip link set up dev veth3

# setup sender IP
exec_ns sender   ip a a 192.168.1.1/30 dev veth0

# setup firewall IPs
exec_ns firewall ip a a 192.168.1.2/30 dev veth1
exec_ns firewall ip a a 192.168.1.5/30 dev veth2

# setup receiver IP
exec_ns receiver ip a a 192.168.1.6/30 dev veth3

# configure default routes sender -> firewall -> receiver
exec_ns sender   ip r a default via 192.168.1.2
exec_ns firewall ip r a default via 192.168.1.6

# generate/insert firewall rules into namespace
insert_firewall_rules

You need to modify the function insert_firewall_rules() and add whatever logic you need to load your ruleset. There are two caveats you need to take care of before injecting your firewall rules into the firewall namespace:

This script uses IP addresses from 192.168.1.0/24 to route packets between the namespaces. If these addresses are also used in your infrastructure/part of your test data, you must use different addresses to not break/confuse your firewall testing.

Sending Packets

The following Python script will generate packets as defined in the given CSV file. It uses multiprocessing to speed up testing and needs to be executed in the sender namespace:

#!/usr/bin/env python3

import argparse
import csv
from multiprocessing import Pool, RLock

from scapy.all import *
from tqdm import tqdm


def main():
    parser = argparse.ArgumentParser(description='Send packets from a csv file.')
    parser.add_argument('--input', '-i', help='CSV file')
    parser.add_argument('--workers', '-w', default=80, type=int, help='Number of workers to send packets')
    args = parser.parse_args()

    with open(args.input, newline='') as csvfile:
        total_packet_count = sum(1 for row in csvfile) - 1

    with open(args.input, newline='') as csvfile:
        packets = csv.reader(csvfile, delimiter=';')
        next(packets)

        tqdm.set_lock(RLock())
        with Pool(processes=args.workers) as pool:
            list(tqdm(pool.imap(send_packet, packets), total=total_packet_count, unit="packets"))


def send_packet(packet):
    layer4_proto = packet[0]
    src_ip = packet[1]
    src_port = int(packet[2])
    dst_ip = packet[3]
    dst_port = int(packet[4])
    if layer4_proto == 'TCP':
        send(IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags='S'), verbose=False)
    elif layer4_proto == 'UDP':
        send(IP(src=src_ip, dst=dst_ip) / UDP(sport=src_port, dport=dst_port), verbose=False)
    elif layer4_proto == 'GRE':
        return
    elif layer4_proto == 'SCTP':
        sctp_packet = IP(src=src_ip, dst=dst_ip) / SCTP(sport=src_port, dport=dst_port) / SCTPChunkInit( init_tag=5,a_rwnd=106496,n_out_streams=2,n_in_streams=2,init_tsn=11 )
        send(sctp_packet, verbose=False)

if __name__ == "__main__":
    main()

It can be invoked as follows (-w sets the amount of parallel workers to use and depends on your test machine):

sudo ip netns exec sender ./send_packets.py -i incoming_bond0.100_traffic.csv -w 200

Detecting Packets On The Receiver Side

Again, we can use tcpdump to capture any packets which enter the receiver namespace:

sudo ip netns exec receiver tcpdump -nli veth3 -w test_capture.pcap ip

The above command should be executed right before send_packets.py gets invoked and can be stopped once send_packets.py has finished its job. For a complete result, we need to run three tests in a row:

Evaluating Test Results

We can re-use what we already have and create a CSV file from the PCAP we have just aquired in the receiver namespace:

./prepare_input.py -i test_capture.pcap -o test_capture.csv

We now have a CSV of the packets that were generated and another CSV of packets which were received. Ideally, both should be exactly the same. To validate this, we can make use of the shell commands cat, sort and diff.

cat test_capture.csv | sort > test_capture_sorted.csv
cat incoming_${INTERFACE}_traffic.csv | sort > incoming_${INTERFACE}_traffic_sorted.csv

diff incoming_${INTERFACE}_traffic_sorted.csv test_capture_sorted.csv

How to read the diff?

Evaluating the “blocked packets” test is a bit easier: we only need to make sure tcpdump has recorded exactly zero packets on the receiver namespace. If anything has passed the firewall (although it should not), we can look at the PCAP and see which packets have passed.

Bringing All Of That Together

If you need to carry out the above steps often, it helps to automate the following steps:

We have built that, too. Again, using Python. But since the whole script is heavily customized around internal infrastructure, I can not share it here.

Things To Consider

Capturing live traffic for testing/validation works best with short-lived connections. It might fail to detect very long running TCP streams as well as irregular connections. If you know your infrastructure well you will probably have a rough idea when dumping traffic will yield the best results (e.g. include scheduled backup runs, automatic maintenance reboots, software deployments, cover at least 24 hours etc.). That said you will get a pretty decent idea if your new firewall ruleset does the same as the old one before did. However, you might still miss one flow or another.

Originally published on by Rudolph Bott