import json import threading import random from langchain_core.pydantic_v1 import BaseModel, Field from langchain.tools import tool from langchain_community.tools import ShellTool from langchain.chains import LLMChain from langchain_community.utilities.dalle_image_generator import DallEAPIWrapper from scapy import *; """ """ shell_tool = ShellTool() """ """ class TCPDump(BaseModel): params: str = Field( description="""A string of all parameters to the `tcpdump` command, including arguments and flags""" ) @tool( "Perform packet analysis using tcpdump", args_schema=TCPDump, return_direct=True, ) def tcp_dump(params: str) -> str: """Must pass all parameters to `tcpdump` including arguments and flags to perform packet analysis""" res = shell_tool.run({"commands": [f"tcpdump {params}"]}) return res class TShark(BaseModel): params: str = Field( description="""A string of all parameters to the `tshark` command, including arguments and flags""" ) @tool( "Perform packet analysis using tshark", args_schema=TShark, return_direct=True, ) def tshark(params: str) -> str: """Must pass all parameters to `tshark` including arguments and flags to perform packet analysis""" res = shell_tool.run({"commands": [f"tcpdump {params}"]}) return res # TODO: Scapy tool # @tool( # "Perform wifi encryption cracking with aircrack-ng", # args_schema=CrackPassword, # return_direct=True, # ) # def wifi_encryption_cracking(json_params: str) -> str: # """Must pass bssid and capfile parameters (as a string containing a json object) to aircrack-ng to perform wifi encryption cracking""" # json_params = json_params.replace("\\", "") # json_obj = json.loads(json_params) # bssid = json_obj["bssid"] # cap = json_obj["capfile"] # res = shell_tool.run( # { # "commands": [f"aircrack-ng --bssid {bssid} -w wordlist/rockyou.txt {cap}"] # } # TODO: Abstrace out wordlist - allow custom/multiple wordlist files as long as they sit in the directory # ) # return res