150 lines
4.6 KiB
Python
150 lines
4.6 KiB
Python
from datetime import datetime
|
|
from io import StringIO
|
|
import sys
|
|
from langchain_core.pydantic_v1 import BaseModel, Field
|
|
from langchain.tools import tool
|
|
from langchain_community.tools import ShellTool
|
|
from langchain_community.utilities.dalle_image_generator import DallEAPIWrapper
|
|
from scapy import *
|
|
import requests
|
|
from scapy.all import rdpcap, wrpcap
|
|
from scapy.layers.inet import IP, TCP, UDP, Packet, PacketList
|
|
|
|
shell_tool = ShellTool()
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
|
|
class TCPDump(BaseModel):
|
|
interface: str = Field(
|
|
description="""The name of the interface to use with tcpdump. May be 'any'"""
|
|
)
|
|
|
|
|
|
@tool(
|
|
"Perform packet capture on a particular network interface using tcpdump. This will return a pcap file which can be analyzed by other tools",
|
|
args_schema=TCPDump,
|
|
return_direct=False,
|
|
)
|
|
def tcp_dump(interface: str) -> str:
|
|
"""Must pass interface name to `tcpdump`"""
|
|
file_name = f'captures/{interface}-{datetime.now().strftime("%Y-%m-%d-%H-%M")}.pcap'
|
|
shell_tool.run(
|
|
{
|
|
"commands": [
|
|
f"sudo timeout 10 tcpdump -i {interface} -w {file_name} ",
|
|
"sleep 2s",
|
|
]
|
|
}
|
|
)
|
|
filtered: str = filter_unique_packets(file_name)
|
|
return filtered
|
|
|
|
|
|
class Image(BaseModel):
|
|
params: str = Field(
|
|
description="""A structured text summary of netowrk information or topology"""
|
|
)
|
|
|
|
|
|
@tool(
|
|
"Create an image of a network. This tool must take as input a summary of packet information, and not a file name",
|
|
args_schema=Image,
|
|
return_direct=False,
|
|
)
|
|
def dalle(params: str) -> str:
|
|
"""Must provide a summary of packet information"""
|
|
prompt = f"""
|
|
Use the below description to create an image of network topology
|
|
```
|
|
{params}
|
|
```
|
|
"""
|
|
image_url = DallEAPIWrapper().run(prompt)
|
|
return image_url
|
|
|
|
|
|
class Summary(BaseModel):
|
|
interface: str = Field(description="""The name of the pcacp file to be read""")
|
|
|
|
|
|
@tool(
|
|
"Summarize a pcap file using scapy, and return the contents",
|
|
args_schema=TCPDump,
|
|
return_direct=False,
|
|
)
|
|
def pcap_summary(file: str) -> str:
|
|
"""Must pass the path to a pcap file to return a summary of packet information and traffic data"""
|
|
cap: PacketList = rdpcap(file)
|
|
|
|
# Workaround to get `summary` to a variable. It normally prints to stdout and returns nothing
|
|
stdout_capture = StringIO()
|
|
save_stdout = sys.stdout
|
|
sys.stdout = stdout_capture
|
|
cap.summary()
|
|
sys.stdout = save_stdout
|
|
|
|
return stdout_capture.getvalue()
|
|
|
|
|
|
# From hw6
|
|
class Iwconfig(BaseModel):
|
|
params: str = Field(
|
|
description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string"
|
|
)
|
|
|
|
|
|
@tool("Get interface information", args_schema=Iwconfig, return_direct=False)
|
|
def get_wireless_interface(params: str) -> str:
|
|
"""Return wireless interface information via iwconfig"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
print("params ", params)
|
|
res = shell_tool.run({"commands": [f"iwconfig {params}"]})
|
|
return res
|
|
|
|
|
|
@tool
|
|
def ip_loc(address):
|
|
"""Get information from an ip address, including geolocation. Takes as a paramater an ip address. Do not use this tool with IP adresses in a reserve range or on LAN"""
|
|
url = f"http://ipwho.is/{address}"
|
|
response = requests.get(url)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
|
|
|
|
# Filter 'unique' (src,dest,protocol) packets from a pcap file with scapy. Save this as a new capture file and return the name
|
|
def filter_unique_packets(pcap_file: str) -> str:
|
|
packets = rdpcap(pcap_file)
|
|
unique_packets = set()
|
|
filtered_packets = []
|
|
|
|
for p in packets:
|
|
packet: Packet = p
|
|
try:
|
|
if IP in packet and (TCP in packet or UDP in packet):
|
|
ip_layer = packet[IP]
|
|
src_ip = ip_layer.src
|
|
dst_ip = ip_layer.dst
|
|
|
|
if TCP in packet:
|
|
protocol = "TCP"
|
|
src_port = packet[TCP].sport
|
|
dst_port = packet[TCP].dport
|
|
elif UDP in packet:
|
|
protocol = "UDP"
|
|
src_port = packet[UDP].sport
|
|
dst_port = packet[UDP].dport
|
|
|
|
packet_signature = (src_ip, dst_ip, protocol, src_port, dst_port)
|
|
|
|
if packet_signature not in unique_packets:
|
|
unique_packets.add(packet_signature)
|
|
filtered_packets.append(packet)
|
|
except AttributeError:
|
|
continue
|
|
name = pcap_file.replace(".pcap", "-f.pcap")
|
|
wrpcap(name, filtered_packets)
|
|
return name
|