diff --git a/.gitignore b/.gitignore index 05d563e..2e8e497 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ __pycache__/ .env *tokens* rag_data -.chromadb \ No newline at end of file +.chromadb +*temp* \ No newline at end of file diff --git a/hw5/app.py b/hw5/app.py index bdde7c2..fdf579e 100644 --- a/hw5/app.py +++ b/hw5/app.py @@ -1,73 +1,142 @@ +from typing import Iterable, Literal from langchain_community.document_loaders.generic import GenericLoader from langchain_community.document_loaders.parsers import LanguageParser from langchain_openai import ChatOpenAI from langchain_core.runnables import RunnablePassthrough from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser +from langchain_community.document_loaders import FileSystemBlobLoader +from langchain_community.document_loaders.blob_loaders.schema import Blob, BlobLoader from langchain_community.document_loaders import AsyncHtmlLoader +from langchain.chains import LLMChain +from langchain.chains import SimpleSequentialChain + from dotenv import load_dotenv import requests +import tempfile +import zipfile +import io +import os from validators import url from time import time +load_dotenv() + """ This application attempts to automatically solve CTF levels for CS492/CS592 Malware Reverse Engineering. """ -load_dotenv() -promt = "You are a malware reverse engineer" -def get_rag_chain(): - return ( - {"context": retriever | format_docs, "question": RunnablePassthrough()} - | prompt - | llm - | StrOutputParser() - ) + + +# def get_rag_chain(): +# return ( +# {"context": retriever | format_docs, "question": RunnablePassthrough()} +# | prompt +# | llm +# | StrOutputParser() +# ) + session = requests.Session() -llm = ChatOpenAI(model_name="gpt-4-turbo", temperature=0) url = "https://cs492.oregonctf.org/" + +def get_group(chapter: int): + if chapter >= 1 and chapter <= 8: + return "Ch01-08" + elif chapter >= 11 and chapter <= 13: + return "Ch11-13" + elif chapter >= 15 and chapter <= 16: + return "Ch15-16" + elif chapter >= 18 and chapter <= 21: + return "Ch18-21" + else: + return False + + def start_session(): - payload = { - 'username': 'demo0', - 'passwd': 'malware' - } + payload = {"username": "demo0", "passwd": "malware"} headers = { - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', - 'Content-Type': 'application/x-www-form-urlencoded' + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Content-Type": "application/x-www-form-urlencoded", } session.post(url, data=payload, headers=headers) -def download(setname): + +def get_file_path(level, group): payload = { - 'setname': setname, + "setname": group, } headers = { - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', - 'Content-Type': 'application/x-www-form-urlencoded' + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", + "Content-Type": "application/x-www-form-urlencoded", } - p = session.post(url+'/download', data=payload, headers=headers) - print(p.headers.get('Content-Type')) - zip = p.raw - print(zip) + response = session.post(url + "/download", data=payload, headers=headers) + with tempfile.TemporaryDirectory() as temp_dir: + with io.BytesIO(response.content) as zip_file: + with zipfile.ZipFile(zip_file) as z: + z.extractall(temp_dir) + extracted_files = os.listdir(temp_dir) + # print(f"Extracted files: {extracted_files}") + for file_name in extracted_files: + if file_name == level: + temp_file_path = os.path.join(temp_dir, file_name) + saved_file_path = os.path.join("./temp", file_name) + os.rename(temp_file_path, saved_file_path) + return saved_file_path + return False +def execute(path: str): + print("path ", path) + loader = FileSystemBlobLoader(path, show_progress=True) + prompt = PromptTemplate.from_template( + "Describe the attached binary file: {file_content}" + ) + llm = ChatOpenAI(model_name="gpt-4-turbo", temperature=0) + blobs: Iterable[Blob] = loader.yield_blobs() + + for b in blobs: + chain = SimpleSequentialChain( + prompt=prompt, + llm=llm, + ) + # print('b ',b) + result = chain.run(file_content=b) + return result + + +print( + "This program will attempt to automatically solve CS 492/592 Malware Reverse Engineering CTF levels." +) +print("Enter the name of the level (example: Ch03DynA_Ltrace)") + while True: try: - start_session() - print(session.cookies) - download('Ch01-08') - line: str = input("llm>> ") - if line: - start_time = time() - - result: str = get_rag_chain.invoke(line) - end_time = time() - elapsed_time = round(end_time - start_time, 2) - print("\n", result, "\n\nElapsed time: ", elapsed_time, " seconds") + level: str = input("name of binary>> ") + if level: + chapter = level[2:4] + if chapter.strip().isdigit(): + group = get_group(int(chapter)) + if group: + start_time = time() + start_session() + if os.path.isfile(os.path.join("./temp", level)): + print("found") + file_path = os.path.join("./temp", level) + else: + file_path = get_file_path(level, group) + if file_path: + results = execute(file_path) + print(results) + else: + raise Exception("Not a valid file") + else: + raise Exception("Not a valid chapter of a possible file") + else: + raise Exception("Bad input format (example: Ch03DynA_Ltrace)") else: break except Exception as e: diff --git a/hw5/requirnments.txt b/hw5/requirnments.txt new file mode 100644 index 0000000..045d427 --- /dev/null +++ b/hw5/requirnments.txt @@ -0,0 +1,5 @@ +langchain-community +langchain_openai +python-dotenv +validators +esprima \ No newline at end of file