From 427e7a1ffdc6ce57c59aef8b659858236537d7c3 Mon Sep 17 00:00:00 2001 From: David Westgate Date: Thu, 6 Jun 2024 16:29:50 -0700 Subject: [PATCH] start work on app and tools --- final/app.py | 63 +++++++++++++++++++++++++++++++++++++++++++ final/tools.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+) create mode 100644 final/tools.py diff --git a/final/app.py b/final/app.py index e69de29..537d327 100644 --- a/final/app.py +++ b/final/app.py @@ -0,0 +1,63 @@ +import os +from langchain_openai import ChatOpenAI +from langchain.agents import AgentExecutor, create_react_agent +from langchain.tools import tool +from tools import ( + get_wireless_interface, + change_adapter_mode, + wifi_encryption_cracking, + deauth_and_capture, + reconnaissance, +) +from langchain import hub +from langchain_community.tools import ShellTool +from langsmith import Client +from dotenv import load_dotenv + +load_dotenv() + +""" +Main rag tool-chain application. Simply load the tools and prompt the user. +""" + +os.environ["LANGCHAIN_TRACING_V2"] = "true" +os.environ["LANGCHAIN_PROJECT"] = f"LangSmith Introduction" +os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" + +client = Client() +shell_tool = ShellTool() +llm = ChatOpenAI(model_name="gpt-4o", temperature=0) +tools = [] +tools.extend( + [ + + ] +) +base_prompt = hub.pull("langchain-ai/react-agent-template") +prompt = base_prompt.partial( + instructions=""" + You are a packet analysis assistant. Use any combination of the tools provided to best serve the users request. + If the request cannot be served with the tools provided, state why and offer advice on how the user could solve the problem. + """ +) +agent = create_react_agent(llm, tools, prompt) +agent_executor = AgentExecutor( + agent=agent, tools=tools, verbose=True, handle_parsing_errors=True +) + + +print( + "I am a packet analysis an assistant. I can perform various tasks related to packet capture files." +) +print(f"I am configured with the following tools") + +for tool in tools: + print(f" Tool: {tool.name} = {tool.description}") + +while True: + line = input("llm>> ") + if line: + result = agent_executor.invoke({"input": line}) + print(result["output"]) + else: + break diff --git a/final/tools.py b/final/tools.py new file mode 100644 index 0000000..1c9abc5 --- /dev/null +++ b/final/tools.py @@ -0,0 +1,72 @@ +import json +import threading +import random +from langchain_core.pydantic_v1 import BaseModel, Field +from langchain.tools import tool +from langchain_community.tools import ShellTool +from langchain.chains import LLMChain +from langchain_community.utilities.dalle_image_generator import DallEAPIWrapper +from scapy import *; +""" + +""" + +shell_tool = ShellTool() + + +""" +""" + + +class TCPDump(BaseModel): + params: str = Field( + description="""A string of all parameters to the `tcpdump` command, including arguments and flags""" + ) + + +@tool( + "Perform packet analysis using tcpdump", + args_schema=TCPDump, + return_direct=True, +) +def tcp_dump(params: str) -> str: + """Must pass all parameters to `tcpdump` including arguments and flags to perform packet analysis""" + res = shell_tool.run({"commands": [f"tcpdump {params}"]}) + return res + + +class TShark(BaseModel): + params: str = Field( + description="""A string of all parameters to the `tshark` command, including arguments and flags""" + ) +@tool( + "Perform packet analysis using tshark", + args_schema=TShark, + return_direct=True, +) +def tshark(params: str) -> str: + """Must pass all parameters to `tshark` including arguments and flags to perform packet analysis""" + res = shell_tool.run({"commands": [f"tcpdump {params}"]}) + return res + + +# TODO: Scapy tool + +# @tool( +# "Perform wifi encryption cracking with aircrack-ng", +# args_schema=CrackPassword, +# return_direct=True, +# ) +# def wifi_encryption_cracking(json_params: str) -> str: +# """Must pass bssid and capfile parameters (as a string containing a json object) to aircrack-ng to perform wifi encryption cracking""" +# json_params = json_params.replace("\\", "") +# json_obj = json.loads(json_params) +# bssid = json_obj["bssid"] +# cap = json_obj["capfile"] + +# res = shell_tool.run( +# { +# "commands": [f"aircrack-ng --bssid {bssid} -w wordlist/rockyou.txt {cap}"] +# } # TODO: Abstrace out wordlist - allow custom/multiple wordlist files as long as they sit in the directory +# ) +# return res