from datetime import datetime from io import StringIO import sys from time import sleep from langchain_core.pydantic_v1 import BaseModel, Field from langchain.tools import tool from langchain_community.tools import ShellTool from langchain_community.utilities.dalle_image_generator import DallEAPIWrapper from scapy import * import requests from scapy.all import rdpcap, wrpcap from scapy.layers.inet import IP, TCP, UDP, Packet, PacketList shell_tool = ShellTool() """ """ class TCPDump(BaseModel): interface: str = Field( description="""The name of the interface to use with tcpdump. May be 'any'""" ) @tool( "Perform packet capture on a particular network interface using tcpdump. This will return a pcap file which can be analyzed by other tools", args_schema=TCPDump, return_direct=False, ) def tcp_dump(interface: str) -> str: """Must pass interface name to `tcpdump`""" file_name = f'captures/{interface}-{datetime.now().strftime("%Y-%m-%d-%H-%M")}.pcap' shell_tool.run( { "commands": [ f"sudo timeout 10 tcpdump -i {interface} -w {file_name} ", "sleep 2s", ] } ) filtered: str = filter_unique_packets(file_name) return filtered class Image(BaseModel): params: str = Field( description="""A structured text summary of netowrk information or topology""" ) @tool( "Create an image of a network. This tool must take as input a summary of packet information, and not a file name. If necessary, condense input to 900 characters or less", args_schema=Image, return_direct=False, ) def dalle(params: str) -> str: """Must provide a summary of packet information""" prompt = f""" Use the below description to create an image of network topology ``` {params} ``` """ image_url = DallEAPIWrapper().run(prompt) return image_url class Summary(BaseModel): interface: str = Field(description="""The name of the pcacp file to be read""") @tool( "Summarize a pcap file using scapy, and return the contents", args_schema=TCPDump, return_direct=False, ) def pcap_summary(file: str) -> str: """Must pass the path to a pcap file to return a summary of packet information and traffic data""" cap: PacketList = rdpcap(file) # Workaround to get `summary` to a variable. It normally prints to stdout and returns nothing stdout_capture = StringIO() save_stdout = sys.stdout sys.stdout = stdout_capture cap.summary() sys.stdout = save_stdout return stdout_capture.getvalue() # From hw6 class Iwconfig(BaseModel): params: str = Field( description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string" ) @tool("Get interface adapter name", args_schema=Iwconfig, return_direct=False) def get_adapter_interface(params: str) -> str: """Network interface adapters via iwconfig""" params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM print("params ", params) res = shell_tool.run({"commands": [f"iwconfig {params}"]}) return res @tool def ip_loc(address): """Get information from an ip address, including geolocation. Takes as a paramater an ip address. Do not use this tool with IP adresses in a reserve range or on LAN""" url = f"http://ipwho.is/{address}" sleep(2) response = requests.get(url) if response.status_code == 200: return response.json() # Filter 'unique' (src,dest,protocol) packets from a pcap file with scapy. Save this as a new capture file and return the name def filter_unique_packets(pcap_file: str) -> str: packets = rdpcap(pcap_file) unique_packets = set() filtered_packets = [] for p in packets: packet: Packet = p try: if IP in packet and (TCP in packet or UDP in packet): ip_layer = packet[IP] src_ip = ip_layer.src dst_ip = ip_layer.dst if TCP in packet: protocol = "TCP" src_port = packet[TCP].sport dst_port = packet[TCP].dport elif UDP in packet: protocol = "UDP" src_port = packet[UDP].sport dst_port = packet[UDP].dport packet_signature = (src_ip, dst_ip, protocol, src_port, dst_port) if packet_signature not in unique_packets: unique_packets.add(packet_signature) filtered_packets.append(packet) except AttributeError: continue name = pcap_file.replace(".pcap", "-f.pcap") wrpcap(name, filtered_packets) return name