more work on final

This commit is contained in:
David Westgate 2024-06-14 12:40:48 -07:00
parent 4d86bb01f4
commit fa5a30db6e
5 changed files with 70 additions and 73 deletions

3
.gitignore vendored
View File

@ -10,4 +10,5 @@ rag_data
*wordlist*
*dumps*
.aider*
*pcap
*pcap
*test.py

View File

@ -1,9 +1,24 @@
###### David Westgate 14 June 2024
## Final Project for gensec
This project is an LLM rag chain which intends to help the user with network analysis and forensics.
Tools are provided to assist the user with capturing packets via `tcpdump`, provide summaries of these packet captures,
perform geographic IP lookup, and provide a visual of network traffic.
## Prerequisites
This appliction utilized some shell tools and assumes you are on a linux/unix like operating system.
It is necessary to install `tcpdump`. `sudo` permissions are also required
## Setup + Run
Install python3, then
```
cd final
mkdir captures
pip install -r requirements.txt
cp .env.example .env #fill in env file with key
python3 app.py
```
### Setup + Run
## Example tests
*Show a summary of network traffic on enp7s0*
*Show a visual representation of the network traffic on enp7s0*
*Show a list of IP addresses that communicate on enp7s0 and identify the countries of those addresses*

View File

@ -2,13 +2,7 @@ import os
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import tool
from tools import (
dalle,
tcp_dump,
tshark,
get_wireless_interface,
ip_loc
)
from tools import dalle, tcp_dump, get_wireless_interface, ip_loc, pcap_summary
from langchain import hub
from langchain_community.tools import ShellTool
from langsmith import Client
@ -28,7 +22,7 @@ client = Client()
shell_tool = ShellTool()
llm = ChatOpenAI(model_name="gpt-4o", temperature=0)
tools = []
tools.extend([tcp_dump,ip_loc, dalle,get_wireless_interface])
tools.extend([tcp_dump, ip_loc, dalle, pcap_summary, get_wireless_interface])
base_prompt = hub.pull("langchain-ai/react-agent-template")
prompt = base_prompt.partial(
instructions="""
@ -43,7 +37,7 @@ agent_executor = AgentExecutor(
print(
"I am a packet analysis an assistant. I can perform various tasks related to packet capture files."
"I am a packet analysis an assistant. I can perform various tasks related to packet capture files, including capture, summarization, IP lookups, and visualization."
)
print(f"I am configured with the following tools")

8
final/requirements.txt Normal file
View File

@ -0,0 +1,8 @@
langchain
langchain_community
langchain_core
langchain_openai
langsmith
python-dotenv
Requests
scapy

View File

@ -1,37 +1,23 @@
from datetime import date, datetime
import json
import threading
import random
from pyshark import FileCapture, packet as Packet
from datetime import datetime
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.tools import tool
from langchain_community.tools import ShellTool
from langchain.chains import LLMChain
from langchain_community.utilities.dalle_image_generator import DallEAPIWrapper
from scapy import *
import requests
from scapy.all import rdpcap, wrpcap
from scapy.layers.inet import IP, TCP, UDP
"""
"""
from scapy.layers.inet import IP, TCP, UDP, Packet, PacketList
shell_tool = ShellTool()
"""
"""
class TCPDump(BaseModel):
params: str = Field(
description="""
A JSON string including `interface` the network interface name and `flags`, any other additional flags or arguments for tcpdump.
Do not include any indication of duration or output file, as this is handled by the tool
"""
interface: str = Field(
description="""The name of the interface to use with tcpdump. May be 'any'"""
)
@ -40,70 +26,59 @@ class TCPDump(BaseModel):
args_schema=TCPDump,
return_direct=False,
)
def tcp_dump(json_params: str) -> str:
"""Must pass parameters to `tcpdump` including arguments and flags to perform packet monitoring and analysis"""
json_params = json_params.replace("\\", "").replace("`", "").replace("'", "")
print("params ", json_params)
json_obj = json.loads(json_params)
interface = json_obj.get("interface", "")
flags = json_obj.get("flags", "")
# print('date ',date())
def tcp_dump(interface: str) -> str:
"""Must pass interface name to `tcpdump`"""
file_name = f'captures/{interface}-{datetime.now().strftime("%Y-%m-%d-%H-%M")}.pcap'
shell_tool.run(
{
"commands": [
f"sudo timeout 10 tcpdump -i {interface} -w {file_name} {flags} "
f"sudo timeout 10 tcpdump -i {interface} -w {file_name} ",
"sleep 2s",
]
}
)
filtered = filter_unique_packets(file_name)
filtered: str = filter_unique_packets(file_name)
return filtered
class TShark(BaseModel):
params: str = Field(
description="""A JSON string including `file` the name of the pcap file to analysize, and `flags` and flags any other additional flags or arguments for tshark"""
)
@tool(
"Perform packet analysis of a pcap file tshark",
args_schema=TShark,
return_direct=False,
)
def tshark(json_params: str) -> str:
"""Must pass all parameters to `tshark` including arguments and flags to perform packet analysis"""
json_params = json_params.replace("\\", "").replace("`", "").replace("'", "")
print("params ", json_params)
json_obj = json.loads(json_params)
file = json_obj.get("file", "")
flags = json_obj.get("flags", "")
class Image(BaseModel):
params: str = Field(
description="""A Graphviz decription of network characteristics"""
description="""A structured text summary of netowrk information or topology"""
)
@tool(
"Create an image of a network",
args_schema=TShark,
"Create an image of a network. This tool must take as input a summary of packet information, and not a file name",
args_schema=Image,
return_direct=False,
)
def dalle(params: str) -> str:
"""Must provive a Graphviz decription of network characteristics to pass to an image generation tool"""
"""Must provide a summary of packet information"""
prompt = f"""
Use the below Graphviz description to create an image of network topology
Use the below description to create an image of network topology
```
{params}
```
"""
"""
image_url = DallEAPIWrapper().run(prompt)
return image_url
class Summary(BaseModel):
interface: str = Field(description="""The name of the pcacp file to be read""")
@tool(
"Summarize a pcap file using scapy, and return the contents",
args_schema=TCPDump,
return_direct=False,
)
def pcap_summary(file: str) -> str:
"""Must pass the path to a pcap file to return a summary of packet information and traffic data"""
cap: PacketList = rdpcap(file)
return cap.summary()
# From hw6
class Iwconfig(BaseModel):
params: str = Field(
@ -122,20 +97,21 @@ def get_wireless_interface(params: str) -> str:
@tool
def ip_loc(address):
"""Get information from an ip address, including geolocation"""
"""Get information from an ip address, including geolocation. Takes as a paramater an ip address"""
url = f"http://ipwho.is/{address}"
response = requests.get(url)
if response.status_code == 200:
return response.json()
# Filter unique packets from a pcap file with pyshark
def filter_unique_packets(pcap_file: str):
# Filter 'unique' (src,dest,protocol) packets from a pcap file with scapy. Save this as a new capture file and return the name
def filter_unique_packets(pcap_file: str) -> str:
packets = rdpcap(pcap_file)
unique_packets = set()
filtered_packets = []
for packet in packets:
for p in packets:
packet: Packet = p
try:
if IP in packet and (TCP in packet or UDP in packet):
ip_layer = packet[IP]
@ -158,3 +134,6 @@ def filter_unique_packets(pcap_file: str):
filtered_packets.append(packet)
except AttributeError:
continue
name = pcap_file.replace(".pcap", "-f.pcap")
wrpcap(name, filtered_packets)
return name