This repository has been archived on 2025-04-28. You can view files and clone it, but cannot push or open issues or pull requests.
gensec-westgate-djw2/final/tools.py
2024-06-14 00:57:48 -07:00

161 lines
4.9 KiB
Python

from datetime import date, datetime
import json
import threading
import random
from pyshark import FileCapture, packet as Packet
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.tools import tool
from langchain_community.tools import ShellTool
from langchain.chains import LLMChain
from langchain_community.utilities.dalle_image_generator import DallEAPIWrapper
from scapy import *
import requests
from scapy.all import rdpcap, wrpcap
from scapy.layers.inet import IP, TCP, UDP
"""
"""
shell_tool = ShellTool()
"""
"""
class TCPDump(BaseModel):
params: str = Field(
description="""
A JSON string including `interface` the network interface name and `flags`, any other additional flags or arguments for tcpdump.
Do not include any indication of duration or output file, as this is handled by the tool
"""
)
@tool(
"Perform packet capture on a particular network interface using tcpdump. This will return a pcap file which can be analyzed by other tools",
args_schema=TCPDump,
return_direct=False,
)
def tcp_dump(json_params: str) -> str:
"""Must pass parameters to `tcpdump` including arguments and flags to perform packet monitoring and analysis"""
json_params = json_params.replace("\\", "").replace("`", "").replace("'", "")
print("params ", json_params)
json_obj = json.loads(json_params)
interface = json_obj.get("interface", "")
flags = json_obj.get("flags", "")
# print('date ',date())
file_name = f'captures/{interface}-{datetime.now().strftime("%Y-%m-%d-%H-%M")}.pcap'
shell_tool.run(
{
"commands": [
f"sudo timeout 10 tcpdump -i {interface} -w {file_name} {flags} "
]
}
)
filtered = filter_unique_packets(file_name)
return filtered
class TShark(BaseModel):
params: str = Field(
description="""A JSON string including `file` the name of the pcap file to analysize, and `flags` and flags any other additional flags or arguments for tshark"""
)
@tool(
"Perform packet analysis of a pcap file tshark",
args_schema=TShark,
return_direct=False,
)
def tshark(json_params: str) -> str:
"""Must pass all parameters to `tshark` including arguments and flags to perform packet analysis"""
json_params = json_params.replace("\\", "").replace("`", "").replace("'", "")
print("params ", json_params)
json_obj = json.loads(json_params)
file = json_obj.get("file", "")
flags = json_obj.get("flags", "")
class Image(BaseModel):
params: str = Field(
description="""A Graphviz decription of network characteristics"""
)
@tool(
"Create an image of a network",
args_schema=TShark,
return_direct=False,
)
def dalle(params: str) -> str:
"""Must provive a Graphviz decription of network characteristics to pass to an image generation tool"""
prompt = f"""
Use the below Graphviz description to create an image of network topology
```
{params}
```
"""
image_url = DallEAPIWrapper().run(prompt)
return image_url
# From hw6
class Iwconfig(BaseModel):
params: str = Field(
description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string"
)
@tool("Get interface information", args_schema=Iwconfig, return_direct=False)
def get_wireless_interface(params: str) -> str:
"""Return wireless interface information via iwconfig"""
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
print("params ", params)
res = shell_tool.run({"commands": [f"iwconfig {params}"]})
return res
@tool
def ip_loc(address):
"""Get information from an ip address, including geolocation"""
url = f"http://ipwho.is/{address}"
response = requests.get(url)
if response.status_code == 200:
return response.json()
# Filter unique packets from a pcap file with pyshark
def filter_unique_packets(pcap_file: str):
packets = rdpcap(pcap_file)
unique_packets = set()
filtered_packets = []
for packet in packets:
try:
if IP in packet and (TCP in packet or UDP in packet):
ip_layer = packet[IP]
src_ip = ip_layer.src
dst_ip = ip_layer.dst
if TCP in packet:
protocol = "TCP"
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
elif UDP in packet:
protocol = "UDP"
src_port = packet[UDP].sport
dst_port = packet[UDP].dport
packet_signature = (src_ip, dst_ip, protocol, src_port, dst_port)
if packet_signature not in unique_packets:
unique_packets.add(packet_signature)
filtered_packets.append(packet)
except AttributeError:
continue