This repository has been archived on 2025-04-28. You can view files and clone it, but cannot push or open issues or pull requests.
gensec-westgate-djw2/final/tools.py
2024-06-15 03:12:39 -07:00

154 lines
5.1 KiB
Python

from datetime import datetime
from io import StringIO
import sys
from time import sleep
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.tools import tool
from langchain_community.tools import ShellTool
from langchain_community.utilities.dalle_image_generator import DallEAPIWrapper
from scapy import *
import requests
from scapy.all import rdpcap, wrpcap
from scapy.layers.inet import IP, TCP, UDP, Packet, PacketList
shell_tool = ShellTool()
"""
The tools here use a combination of different mechanisms to perform, but all serve to assist with network analysis.
"""
class TCPDump(BaseModel):
interface: str = Field(
description="""The name of the interface to use with tcpdump. May be 'any'"""
)
@tool(
"Perform packet capture on a particular network interface using tcpdump. This will return a pcap file which can be analyzed by other tools",
args_schema=TCPDump,
return_direct=False,
)
def tcp_dump(interface: str) -> str:
"""Must pass interface name to `tcpdump`"""
file_name = f'captures/{interface}-{datetime.now().strftime("%Y-%m-%d-%H-%M")}.pcap'
shell_tool.run(
{
"commands": [
f"sudo timeout 10 tcpdump -i {interface} -w {file_name} ",
"sleep 2s",
]
}
)
filtered: str = filter_unique_packets(file_name)
return filtered
"""
This tool attempts to wrap the network summary, in a prompt with some context before sending it off to Dall-e
"""
class Image(BaseModel):
params: str = Field(
description="""A structured text summary of netowrk information or topology"""
)
@tool(
"Create a visual image of a network. This tool must take as input a summary of packet information, and not a file name. If necessary, condense input to 900 characters or less",
args_schema=Image,
return_direct=False,
)
def dalle(params: str) -> str:
"""Must provide a summary of packet information"""
prompt = f"""
Use the below description to create an image of network topology
```
{params}
```
"""
image_url = DallEAPIWrapper().run(prompt)
return image_url
class Summary(BaseModel):
interface: str = Field(description="""The name of the pcacp file to be read""")
@tool(
"Summarize a pcap file using scapy, and return the contents",
args_schema=TCPDump,
return_direct=False,
)
def pcap_summary(file: str) -> str:
"""Must pass the path to a pcap file to return a summary of packet information and traffic data"""
cap: PacketList = rdpcap(file)
# Workaround to get `summary` to a variable. It normally prints to stdout and returns nothing
stdout_capture = StringIO()
save_stdout = sys.stdout
sys.stdout = stdout_capture
cap.summary()
sys.stdout = save_stdout
return stdout_capture.getvalue()
"""
Tool borrowed from hw6 wifi cracking
"""
class Iwconfig(BaseModel):
params: str = Field(
description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string"
)
@tool("Get interface adapter name", args_schema=Iwconfig, return_direct=False)
def get_adapter_interface(params: str) -> str:
"""Network interface adapters via iwconfig"""
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
print("params ", params)
res = shell_tool.run({"commands": [f"iwconfig {params}"]})
return res
""" Src: https://github.com/wu4f/cs410g-src/blob/main/09_ThreatIntelligence/01_net_int.py """
@tool
def ip_loc(address):
"""Get information from an ip address, including geolocation. Takes as a paramater an ip address. Do not use this tool with IP adresses in a reserve range or on LAN"""
url = f"http://ipwho.is/{address}"
sleep(2)
response = requests.get(url)
if response.status_code == 200:
return response.json()
""" Filter 'unique' (src,dest,protocol) packets from a pcap file with scapy. Save this as a new capture file and return the name
This is necessary to limit input sizes to LLMs
"""
def filter_unique_packets(pcap_file: str) -> str:
packets = rdpcap(pcap_file)
unique_packets = set()
filtered_packets = []
for p in packets:
packet: Packet = p
try:
if IP in packet and (TCP in packet or UDP in packet):
ip_layer = packet[IP]
src_ip = ip_layer.src
dst_ip = ip_layer.dst
if TCP in packet:
protocol = "TCP"
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
elif UDP in packet:
protocol = "UDP"
src_port = packet[UDP].sport
dst_port = packet[UDP].dport
packet_signature = (src_ip, dst_ip, protocol, src_port, dst_port)
if packet_signature not in unique_packets:
unique_packets.add(packet_signature)
filtered_packets.append(packet)
except AttributeError:
continue
name = pcap_file.replace(".pcap", "-f.pcap")
wrpcap(name, filtered_packets)
return name