187 lines
6.8 KiB
Python
187 lines
6.8 KiB
Python
import subprocess
|
|
import requests
|
|
import tempfile
|
|
import zipfile
|
|
import io
|
|
import random
|
|
import os
|
|
from langchain_openai import ChatOpenAI
|
|
from langchain_core.runnables import RunnablePassthrough
|
|
from langchain_core.prompts import PromptTemplate
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
from langchain.agents import AgentExecutor, create_react_agent, load_tools
|
|
from langchain_core.pydantic_v1 import BaseModel, Field, root_validator
|
|
from langchain.tools import tool
|
|
from langchain import hub
|
|
from langchain_community.tools import ShellTool
|
|
from dotenv import load_dotenv
|
|
from time import time
|
|
|
|
shell_tool = ShellTool()
|
|
|
|
|
|
class CrackPassword(BaseModel):
|
|
params: str = Field(
|
|
description="Should be command line parameters to 'aircrack-ng' to perform some kind of wifi encryption cracking"
|
|
)
|
|
|
|
def get_wordlists():
|
|
directory = "wordlists"
|
|
# Check if the directory exists
|
|
if not os.path.exists(directory):
|
|
raise FileNotFoundError(f"The directory {directory} does not exist.")
|
|
|
|
# List all files in the directory
|
|
files = [
|
|
file
|
|
for file in os.listdir(directory)
|
|
if os.path.isfile(os.path.join(directory, file))
|
|
]
|
|
|
|
# Check if the list is empty
|
|
if not files:
|
|
raise Exception(f"No files found in the directory {directory}.")
|
|
|
|
# Return the first file, for the sake of simplicity. TODO: accomodate the possibility of multiple word list files
|
|
return files[0]
|
|
|
|
|
|
@tool(
|
|
"Perform wifi encryption cracking with aircrack-ng",
|
|
args_schema=CrackPassword,
|
|
return_direct=False,
|
|
)
|
|
def wifi_encryption_cracking(params: str) -> str:
|
|
"""Can pass parameters to aircrack-ng to perform wifi encryption cracking"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
hash = random.getrandbits(16)
|
|
res = shell_tool.run({"commands": [f"aircrack-ng {params}"]})
|
|
return res
|
|
|
|
class DeauthAndCapture(BaseModel):
|
|
bssid: str = Field(
|
|
description="Should be a BSSID of a wifi AP"
|
|
)
|
|
client: str = Field(description="Should be a MAC of a client connected to the above BSSID, as the target of the de-auth")
|
|
|
|
|
|
@tool(
|
|
"Perform a de-auth and capture for a given wifi BSSID and client",
|
|
args_schema=DeauthAndCapture,
|
|
return_direct=False,
|
|
)
|
|
def deauth_and_capture(bssid: str, client:str) -> str:
|
|
"""Can pass bssid and client parameters to aireplay-ng to perform and capture a de-auth"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
res = shell_tool.run({"commands": [f"sudo aireplay-ng --deauth 10 -a {bssid} -c {client} wlo1mon"]})
|
|
return res
|
|
|
|
class PacketTransmission(BaseModel):
|
|
params: str = Field(
|
|
description="Should be command line parameters to 'aireplay-ng' to perform some kind of wifi frame or packet transmission"
|
|
)
|
|
|
|
|
|
@tool(
|
|
"Perform packet or wifi frame transmission with aireplay-ng",
|
|
args_schema=PacketTransmission,
|
|
return_direct=False,
|
|
)
|
|
def packet_frame_transmission(params: str) -> str:
|
|
"""Can pass parameters to aireplay-ng to perform packet or wifi frame transmission"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
res = shell_tool.run({"commands": [f"sudo aireplay-ng {params}"]})
|
|
return res
|
|
|
|
|
|
class PacketCapture(BaseModel):
|
|
params: str = Field(
|
|
description="""Should be command line parameters to 'airodump-ng' to perform some kind of wifi reconnaissance (determine BSSID and channel of a network) or perform packet capture for a specified BSSID and channel.
|
|
Do not pass a -w, --write, or -o parameter, as this is already handled"""
|
|
)
|
|
|
|
|
|
@tool(
|
|
"Perform packet capture or wifi reconnaissance with airodump-ng. Can be used to find confirm a BSSID and channel, or to capture a handshake for a known BSSID and channel",
|
|
args_schema=PacketCapture,
|
|
return_direct=False,
|
|
)
|
|
def packet_capture_reconnaissance(params: str) -> str:
|
|
"""Can pass parameters to airodump-ng to gather information about a BSSID such as the channel, or to capture a handshake for a specified BSSID and channel"""
|
|
hash = random.getrandbits(16)
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
res = shell_tool.run(
|
|
{
|
|
"commands": [
|
|
f"sudo timeout -s SIGKILL 15 airodump-ng --output-format csv --write dumps/dump-{hash} {params} "
|
|
]
|
|
}
|
|
)
|
|
fp = open(f"dumps/dump-{hash}-01.csv", "r")
|
|
contents = fp.read()
|
|
fp.close()
|
|
return contents
|
|
|
|
|
|
class IwScan(BaseModel):
|
|
interface: str = Field(
|
|
description="Should be a wireless interface name, used as a paramater to 'iw' to scan for wifi networks"
|
|
)
|
|
|
|
# network: str = Field(
|
|
# description="Should be the name or SSID of the wifi network you are interested in"
|
|
# )
|
|
|
|
|
|
@tool(
|
|
"Perform wifi scanning with iw. Requires the adapter be in managed mode. Adepter should be put into managed mode before running this if necessary and the interface name should be that of the managed mode interface",
|
|
args_schema=IwScan,
|
|
return_direct=False,
|
|
)
|
|
def wifi_network_reconnissance(interface: str) -> str:
|
|
"""Can pass a wireless interface name and wifi network name to return technical information about a wifi network"""
|
|
interface = interface.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
res = shell_tool.run(
|
|
{
|
|
"commands": [
|
|
f"sudo ifconfig {interface} up",
|
|
"sleep 5",
|
|
f'sudo iw {interface} scan | grep -B 9 -A 206 "NetSec"',
|
|
]
|
|
}
|
|
) # TODO: fix error when passing network param
|
|
return res
|
|
|
|
|
|
class ChangeMonitorMode(BaseModel):
|
|
params: str = Field(
|
|
description="Should be command line parameters to 'airmon-ng' to change the state of a given wireless iterface mode."
|
|
)
|
|
|
|
|
|
@tool(
|
|
"Change the state of the wireless adapter mode with airmon-ng",
|
|
args_schema=ChangeMonitorMode,
|
|
return_direct=False,
|
|
)
|
|
def change_adapter_mode(params: str) -> str:
|
|
"""Can pass parameters to airmon-ng to change the mode of the wireless adapter"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
res = shell_tool.run({"commands": [f"sudo airmon-ng {params}", "sleep 5s"]})
|
|
return res
|
|
|
|
|
|
class Iwconfig(BaseModel):
|
|
params: str = Field(
|
|
description="should be command line parameters to 'iwconfig'. If none are needed, this should be left as an empty string"
|
|
)
|
|
|
|
|
|
@tool("Get interface information", args_schema=Iwconfig, return_direct=False)
|
|
def get_wireless_interface(params: str) -> str:
|
|
"""Return wireless interface information via iwconfig"""
|
|
params = params.replace("`", "").replace("\n", "") # fix buggy input from LLM
|
|
print("params ", params)
|
|
res = shell_tool.run({"commands": [f"iwconfig {params}"]})
|
|
return res
|