This notebook is to explore the use of Scapy, a network packet tool developed in Python. Scapy can be used as a standalone application or as a Python library - we will use the library to keep within our notebook environment.
Note: To execute most commands in Scapy, you will require root priviledges. It is not recommended to typically start JupyterLab with root riviledges, however we will do so purely for the purpose of this lab activity. For JupyterLab, you can start your notebook session by typing:
You should open Wireshark on your UWEcyber VM whilst interacting with this notebook, to observe the traffic behaviour.
For the first tasks below, use the 'icmp' filter in Wireshark on your network interface.
You will find more useful resources for Scapy at:
from scapy.all import ICMP, IP, sr1
print(sr1(IP(dst="192.168.59.200")/ICMP()).summary())
Begin emission: Finished sending 1 packets. Received 2 packets, got 1 answers, remaining 0 packets .*IP / ICMP 192.168.59.200 > 192.168.59.136 echo-reply 0 / Padding
from scapy.all import sniff
pkts = sniff(filter="arp", count=10)
print(pkts.summary())
Ether / ARP who has 192.168.0.154 says 0.0.0.0 / Padding Ether / ARP who has 192.168.0.104 says 192.168.0.104 / Padding Ether / ARP who has 192.168.0.1 says 192.168.0.154 / Padding Ether / ARP who has 192.168.0.1 says 192.168.0.139 / Padding Ether / ARP who has 192.168.0.240 says 192.168.0.240 / Padding Ether / ARP who has 192.168.0.1 says 192.168.0.194 / Padding Ether / ARP who has 192.168.0.1 says 192.168.0.25 Ether / ARP is at ac:f8:cc:c6:f9:8b says 192.168.0.1 / Padding Ether / ARP who has 192.168.0.50 says 192.168.0.50 / Padding Ether / ARP who has 192.168.0.35 says 192.168.0.35 / Padding None
from scapy.all import ARP, sniff
def arp_display(pkt):
if pkt[ARP].op == 1: # who-has (request)
return f"Request: {pkt[ARP].psrc} is asking about {pkt[ARP].pdst}"
if pkt[ARP].op == 2: # is-at (response)
return f"*Response: {pkt[ARP].hwsrc} has address {pkt[ARP].psrc}"
sniff(prn=arp_display, filter="arp", store=0, count=10)
Request: 192.168.0.23 is asking about 192.168.0.23 Request: 192.168.0.52 is asking about 192.168.0.52 Request: 192.168.0.100 is asking about 192.168.0.1 Request: 192.168.0.233 is asking about 192.168.0.233 Request: 192.168.0.104 is asking about 192.168.0.104 Request: 192.168.0.15 is asking about 192.168.0.1 Request: 0.0.0.0 is asking about 192.168.0.154 Request: 192.168.0.240 is asking about 192.168.0.240 Request: 192.168.0.50 is asking about 192.168.0.50 Request: 192.168.0.35 is asking about 192.168.0.35
<Sniffed: TCP:0 UDP:0 ICMP:0 Other:0>
from random import randint
from scapy.all import IP, TCP, send
# Create the skeleton of our packet
template = IP(dst="192.168.59.200")/TCP()
# Start lighting up those bits!
template[TCP].flags = 'UFP'
# Create a list with a large number of packets to send
# Each packet will have a random TCP dest port for attack obfuscation
xmas = []
for pktNum in range(0,100):
xmas.extend(template)
xmas[pktNum][TCP].dport = randint(1,65535)
# Send the list of packets
send(xmas)
.................................................................................................... Sent 100 packets.
from scapy.all import DNS, DNSQR, IP, sr1, UDP
dns_req = IP(dst='8.8.8.8')/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname='www.thepacketgeek.com'))
answer = sr1(dns_req, verbose=0)
print(answer[DNS].summary())
DNS Ans "b'thepacketgeek.github.io.'"
from scapy.all import DNS, DNSQR, IP, sr1, UDP
dns_req = IP(dst='8.8.8.8')/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname='uwe.ac.uk'))
answer = sr1(dns_req, verbose=0)
print(answer[DNS].summary())
DNS Ans "164.11.4.40"
from scapy.all import DNS, DNSQR, DNSRR, IP, send, sniff, sr1, UDP
IFACE = "lo0" # Or your default interface
DNS_SERVER_IP = "127.0.0.1" # Your local IP
BPF_FILTER = f"udp port 53 and ip dst {DNS_SERVER_IP}"
def dns_responder(local_ip: str):
def forward_dns(orig_pkt: IP):
print(f"Forwarding: {orig_pkt[DNSQR].qname}")
response = sr1(
IP(dst='8.8.8.8')/
UDP(sport=orig_pkt[UDP].sport)/
DNS(rd=1, id=orig_pkt[DNS].id, qd=DNSQR(qname=orig_pkt[DNSQR].qname)),
verbose=0,
)
resp_pkt = IP(dst=orig_pkt[IP].src, src=DNS_SERVER_IP)/UDP(dport=orig_pkt[UDP].sport)/DNS()
resp_pkt[DNS] = response[DNS]
send(resp_pkt, verbose=0)
return f"Responding to {orig_pkt[IP].src}"
def get_response(pkt: IP):
if (
DNS in pkt and
pkt[DNS].opcode == 0 and
pkt[DNS].ancount == 0
):
if "trailers.apple.com" in str(pkt["DNS Question Record"].qname):
spf_resp = IP(dst=pkt[IP].src)/UDP(dport=pkt[UDP].sport, sport=53)/DNS(id=pkt[DNS].id,ancount=1,an=DNSRR(rrname=pkt[DNSQR].qname, rdata=local_ip)/DNSRR(rrname="trailers.apple.com",rdata=local_ip))
send(spf_resp, verbose=0, iface=IFACE)
return f"Spoofed DNS Response Sent: {pkt[IP].src}"
else:
# make DNS query, capturing the answer and send the answer
return forward_dns(pkt)
return get_response
sniff(filter=BPF_FILTER, prn=dns_responder(DNS_SERVER_IP), iface=IFACE)
--------------------------------------------------------------------------- OSError Traceback (most recent call last) Cell In [16], line 41 37 return forward_dns(pkt) 39 return get_response ---> 41 sniff(filter=BPF_FILTER, prn=dns_responder(DNS_SERVER_IP), iface=IFACE) File /usr/local/lib/python3.8/dist-packages/scapy/sendrecv.py:1263, in sniff(*args, **kwargs) 1259 @conf.commands.register 1260 def sniff(*args, **kwargs): 1261 # type: (*Any, **Any) -> PacketList 1262 sniffer = AsyncSniffer() -> 1263 sniffer._run(*args, **kwargs) 1264 return cast(PacketList, sniffer.results) File /usr/local/lib/python3.8/dist-packages/scapy/sendrecv.py:1127, in AsyncSniffer._run(self, count, store, offline, quiet, prn, lfilter, L2socket, timeout, opened_socket, stop_filter, iface, started_callback, session, session_kwargs, **karg) 1121 sniff_sockets.update( 1122 (L2socket(type=ETH_P_ALL, iface=ifname, **karg), 1123 iflabel) 1124 for ifname, iflabel in six.iteritems(iface) 1125 ) 1126 else: -> 1127 sniff_sockets[L2socket(type=ETH_P_ALL, iface=iface, 1128 **karg)] = iface 1130 # Get select information from the sockets 1131 _main_socket = next(iter(sniff_sockets)) File /usr/local/lib/python3.8/dist-packages/scapy/arch/linux.py:497, in L2Socket.__init__(self, iface, type, promisc, filter, nofilter, monitor) 495 if filter is not None: 496 try: --> 497 attach_filter(self.ins, filter, self.iface) 498 except ImportError as ex: 499 log_runtime.error("Cannot set filter: %s", ex) File /usr/local/lib/python3.8/dist-packages/scapy/arch/linux.py:166, in attach_filter(sock, bpf_filter, iface) 157 def attach_filter(sock, bpf_filter, iface): 158 # type: (socket.socket, str, Union[NetworkInterface, str]) -> None 159 """ 160 Compile bpf filter and attach it to a socket 161 (...) 164 :param iface: the interface used to compile 165 """ --> 166 bp = compile_filter(bpf_filter, iface) 167 if conf.use_pypy and sys.pypy_version_info <= (7, 3, 2): # type: ignore 168 # PyPy < 7.3.2 has a broken behavior 169 # https://foss.heptapod.net/pypy/pypy/-/issues/3298 170 bp = struct.pack( 171 'HL', 172 bp.bf_len, ctypes.addressof(bp.bf_insns.contents) 173 ) File /usr/local/lib/python3.8/dist-packages/scapy/arch/common.py:150, in compile_filter(filter_exp, iface, linktype, promisc) 148 error = bytes(bytearray(err)).strip(b"\x00") 149 if error: --> 150 raise OSError(error) 151 ret = pcap_compile( 152 pcap, ctypes.byref(bpf), bpf_filter, 0, -1 153 ) 154 pcap_close(pcap) OSError: b'lo0: No such device exists (SIOCGIFHWADDR: No such device)'
import random
from scapy.all import ICMP, IP, sr1, TCP
# Define end host and TCP port range
host = "192.168.56.200"
port_range = [22, 23, 80, 443, 3389]
# Send SYN with random Src Port for each Dst port
for dst_port in port_range:
src_port = random.randint(1025,65534)
resp = sr1(
IP(dst=host)/TCP(sport=src_port,dport=dst_port,flags="S"),timeout=1,
verbose=0,
)
if resp is None:
print(f"{host}:{dst_port} is filtered (silently dropped).")
elif(resp.haslayer(TCP)):
if(resp.getlayer(TCP).flags == 0x12):
# Send a gratuitous RST to close the connection
send_rst = sr(
IP(dst=host)/TCP(sport=src_port,dport=dst_port,flags='R'),
timeout=1,
verbose=0,
)
print(f"{host}:{dst_port} is open.")
elif (resp.getlayer(TCP).flags == 0x14):
print(f"{host}:{dst_port} is closed.")
elif(resp.haslayer(ICMP)):
if(
int(resp.getlayer(ICMP).type) == 3 and
int(resp.getlayer(ICMP).code) in [1,2,3,9,10,13]
):
print(f"{host}:{dst_port} is filtered (silently dropped).")
192.168.56.200:22 is filtered (silently dropped). 192.168.56.200:23 is filtered (silently dropped). 192.168.56.200:80 is filtered (silently dropped). 192.168.56.200:443 is filtered (silently dropped). 192.168.56.200:3389 is filtered (silently dropped).