Using Scapy to send Network Packets¶

This notebook is to explore the use of Scapy, a network packet tool developed in Python. Scapy can be used as a standalone application or as a Python library - we will use the library to keep within our notebook environment.

Note: To execute most commands in Scapy, you will require root priviledges. It is not recommended to typically start JupyterLab with root riviledges, however we will do so purely for the purpose of this lab activity. For JupyterLab, you can start your notebook session by typing:

  • sudo python -m jupyterlab --allow-root

You should open Wireshark on your UWEcyber VM whilst interacting with this notebook, to observe the traffic behaviour.

For the first tasks below, use the 'icmp' filter in Wireshark on your network interface.

You will find more useful resources for Scapy at:

  • https://scapy.net/
  • https://thepacketgeek.com/scapy/building-network-tools/
In [3]:
from scapy.all import ICMP, IP, sr1
In [6]:
print(sr1(IP(dst="192.168.59.200")/ICMP()).summary())
Begin emission:
Finished sending 1 packets.

Received 2 packets, got 1 answers, remaining 0 packets
.*IP / ICMP 192.168.59.200 > 192.168.59.136 echo-reply 0 / Padding
In [7]:
from scapy.all import sniff

pkts = sniff(filter="arp", count=10)
print(pkts.summary())
Ether / ARP who has 192.168.0.154 says 0.0.0.0 / Padding
Ether / ARP who has 192.168.0.104 says 192.168.0.104 / Padding
Ether / ARP who has 192.168.0.1 says 192.168.0.154 / Padding
Ether / ARP who has 192.168.0.1 says 192.168.0.139 / Padding
Ether / ARP who has 192.168.0.240 says 192.168.0.240 / Padding
Ether / ARP who has 192.168.0.1 says 192.168.0.194 / Padding
Ether / ARP who has 192.168.0.1 says 192.168.0.25
Ether / ARP is at ac:f8:cc:c6:f9:8b says 192.168.0.1 / Padding
Ether / ARP who has 192.168.0.50 says 192.168.0.50 / Padding
Ether / ARP who has 192.168.0.35 says 192.168.0.35 / Padding
None
In [8]:
from scapy.all import ARP, sniff

def arp_display(pkt):
    if pkt[ARP].op == 1:  # who-has (request)
        return f"Request: {pkt[ARP].psrc} is asking about {pkt[ARP].pdst}"
    if pkt[ARP].op == 2:  # is-at (response)
        return f"*Response: {pkt[ARP].hwsrc} has address {pkt[ARP].psrc}"

sniff(prn=arp_display, filter="arp", store=0, count=10)
Request: 192.168.0.23 is asking about 192.168.0.23
Request: 192.168.0.52 is asking about 192.168.0.52
Request: 192.168.0.100 is asking about 192.168.0.1
Request: 192.168.0.233 is asking about 192.168.0.233
Request: 192.168.0.104 is asking about 192.168.0.104
Request: 192.168.0.15 is asking about 192.168.0.1
Request: 0.0.0.0 is asking about 192.168.0.154
Request: 192.168.0.240 is asking about 192.168.0.240
Request: 192.168.0.50 is asking about 192.168.0.50
Request: 192.168.0.35 is asking about 192.168.0.35
Out[8]:
<Sniffed: TCP:0 UDP:0 ICMP:0 Other:0>

Making a Christmas Tree Packet¶

In [11]:
from random import randint
from scapy.all import IP, TCP, send

# Create the skeleton of our packet
template = IP(dst="192.168.59.200")/TCP()

# Start lighting up those bits!
template[TCP].flags = 'UFP'

# Create a list with a large number of packets to send
# Each packet will have a random TCP dest port for attack obfuscation
xmas = []
for pktNum in range(0,100):
  xmas.extend(template)
  xmas[pktNum][TCP].dport = randint(1,65535)

# Send the list of packets
send(xmas)
....................................................................................................
Sent 100 packets.

DNS¶

In [13]:
from scapy.all import DNS, DNSQR, IP, sr1, UDP

dns_req = IP(dst='8.8.8.8')/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname='www.thepacketgeek.com'))
answer = sr1(dns_req, verbose=0)

print(answer[DNS].summary())
DNS Ans "b'thepacketgeek.github.io.'" 
In [15]:
from scapy.all import DNS, DNSQR, IP, sr1, UDP

dns_req = IP(dst='8.8.8.8')/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname='uwe.ac.uk'))
answer = sr1(dns_req, verbose=0)

print(answer[DNS].summary())
DNS Ans "164.11.4.40" 
In [16]:
from scapy.all import DNS, DNSQR, DNSRR, IP, send, sniff, sr1, UDP

IFACE = "lo0"   # Or your default interface
DNS_SERVER_IP = "127.0.0.1"  # Your local IP

BPF_FILTER = f"udp port 53 and ip dst {DNS_SERVER_IP}"


def dns_responder(local_ip: str):

    def forward_dns(orig_pkt: IP):
        print(f"Forwarding: {orig_pkt[DNSQR].qname}")
        response = sr1(
            IP(dst='8.8.8.8')/
                UDP(sport=orig_pkt[UDP].sport)/
                DNS(rd=1, id=orig_pkt[DNS].id, qd=DNSQR(qname=orig_pkt[DNSQR].qname)),
            verbose=0,
        )
        resp_pkt = IP(dst=orig_pkt[IP].src, src=DNS_SERVER_IP)/UDP(dport=orig_pkt[UDP].sport)/DNS()
        resp_pkt[DNS] = response[DNS]
        send(resp_pkt, verbose=0)
        return f"Responding to {orig_pkt[IP].src}"

    def get_response(pkt: IP):
        if (
            DNS in pkt and
            pkt[DNS].opcode == 0 and
            pkt[DNS].ancount == 0
        ):
            if "trailers.apple.com" in str(pkt["DNS Question Record"].qname):
                spf_resp = IP(dst=pkt[IP].src)/UDP(dport=pkt[UDP].sport, sport=53)/DNS(id=pkt[DNS].id,ancount=1,an=DNSRR(rrname=pkt[DNSQR].qname, rdata=local_ip)/DNSRR(rrname="trailers.apple.com",rdata=local_ip))
                send(spf_resp, verbose=0, iface=IFACE)
                return f"Spoofed DNS Response Sent: {pkt[IP].src}"

            else:
                # make DNS query, capturing the answer and send the answer
                return forward_dns(pkt)

    return get_response

sniff(filter=BPF_FILTER, prn=dns_responder(DNS_SERVER_IP), iface=IFACE)
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
Cell In [16], line 41
     37                 return forward_dns(pkt)
     39     return get_response
---> 41 sniff(filter=BPF_FILTER, prn=dns_responder(DNS_SERVER_IP), iface=IFACE)

File /usr/local/lib/python3.8/dist-packages/scapy/sendrecv.py:1263, in sniff(*args, **kwargs)
   1259 @conf.commands.register
   1260 def sniff(*args, **kwargs):
   1261     # type: (*Any, **Any) -> PacketList
   1262     sniffer = AsyncSniffer()
-> 1263     sniffer._run(*args, **kwargs)
   1264     return cast(PacketList, sniffer.results)

File /usr/local/lib/python3.8/dist-packages/scapy/sendrecv.py:1127, in AsyncSniffer._run(self, count, store, offline, quiet, prn, lfilter, L2socket, timeout, opened_socket, stop_filter, iface, started_callback, session, session_kwargs, **karg)
   1121         sniff_sockets.update(
   1122             (L2socket(type=ETH_P_ALL, iface=ifname, **karg),
   1123              iflabel)
   1124             for ifname, iflabel in six.iteritems(iface)
   1125         )
   1126     else:
-> 1127         sniff_sockets[L2socket(type=ETH_P_ALL, iface=iface,
   1128                                **karg)] = iface
   1130 # Get select information from the sockets
   1131 _main_socket = next(iter(sniff_sockets))

File /usr/local/lib/python3.8/dist-packages/scapy/arch/linux.py:497, in L2Socket.__init__(self, iface, type, promisc, filter, nofilter, monitor)
    495 if filter is not None:
    496     try:
--> 497         attach_filter(self.ins, filter, self.iface)
    498     except ImportError as ex:
    499         log_runtime.error("Cannot set filter: %s", ex)

File /usr/local/lib/python3.8/dist-packages/scapy/arch/linux.py:166, in attach_filter(sock, bpf_filter, iface)
    157 def attach_filter(sock, bpf_filter, iface):
    158     # type: (socket.socket, str, Union[NetworkInterface, str]) -> None
    159     """
    160     Compile bpf filter and attach it to a socket
    161 
   (...)
    164     :param iface: the interface used to compile
    165     """
--> 166     bp = compile_filter(bpf_filter, iface)
    167     if conf.use_pypy and sys.pypy_version_info <= (7, 3, 2):  # type: ignore
    168         # PyPy < 7.3.2 has a broken behavior
    169         # https://foss.heptapod.net/pypy/pypy/-/issues/3298
    170         bp = struct.pack(
    171             'HL',
    172             bp.bf_len, ctypes.addressof(bp.bf_insns.contents)
    173         )

File /usr/local/lib/python3.8/dist-packages/scapy/arch/common.py:150, in compile_filter(filter_exp, iface, linktype, promisc)
    148 error = bytes(bytearray(err)).strip(b"\x00")
    149 if error:
--> 150     raise OSError(error)
    151 ret = pcap_compile(
    152     pcap, ctypes.byref(bpf), bpf_filter, 0, -1
    153 )
    154 pcap_close(pcap)

OSError: b'lo0: No such device exists (SIOCGIFHWADDR: No such device)'

TCP¶

In [20]:
import random
from scapy.all import ICMP, IP, sr1, TCP

# Define end host and TCP port range
host = "192.168.56.200"
port_range = [22, 23, 80, 443, 3389]

# Send SYN with random Src Port for each Dst port
for dst_port in port_range:
    src_port = random.randint(1025,65534)
    resp = sr1(
        IP(dst=host)/TCP(sport=src_port,dport=dst_port,flags="S"),timeout=1,
        verbose=0,
    )

    if resp is None:
        print(f"{host}:{dst_port} is filtered (silently dropped).")

    elif(resp.haslayer(TCP)):
        if(resp.getlayer(TCP).flags == 0x12):
            # Send a gratuitous RST to close the connection
            send_rst = sr(
                IP(dst=host)/TCP(sport=src_port,dport=dst_port,flags='R'),
                timeout=1,
                verbose=0,
            )
            print(f"{host}:{dst_port} is open.")

        elif (resp.getlayer(TCP).flags == 0x14):
            print(f"{host}:{dst_port} is closed.")

    elif(resp.haslayer(ICMP)):
        if(
            int(resp.getlayer(ICMP).type) == 3 and
            int(resp.getlayer(ICMP).code) in [1,2,3,9,10,13]
        ):
            print(f"{host}:{dst_port} is filtered (silently dropped).")
192.168.56.200:22 is filtered (silently dropped).
192.168.56.200:23 is filtered (silently dropped).
192.168.56.200:80 is filtered (silently dropped).
192.168.56.200:443 is filtered (silently dropped).
192.168.56.200:3389 is filtered (silently dropped).
In [ ]: