from typing import Iterable, Literal from langchain_community.document_loaders.generic import GenericLoader from langchain_community.document_loaders.parsers import LanguageParser from langchain_openai import ChatOpenAI from langchain_core.runnables import RunnablePassthrough from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_community.document_loaders import FileSystemBlobLoader from langchain_community.document_loaders.blob_loaders.schema import Blob, BlobLoader from langchain_community.document_loaders import AsyncHtmlLoader from langchain.chains import LLMChain from langchain.chains import SimpleSequentialChain from dotenv import load_dotenv import requests import tempfile import zipfile import io import os from validators import url from time import time load_dotenv() """ This application attempts to automatically solve CTF levels for CS492/CS592 Malware Reverse Engineering. """ # def get_rag_chain(): # return ( # {"context": retriever | format_docs, "question": RunnablePassthrough()} # | prompt # | llm # | StrOutputParser() # ) session = requests.Session() url = "https://cs492.oregonctf.org/" def get_group(chapter: int): if chapter >= 1 and chapter <= 8: return "Ch01-08" elif chapter >= 11 and chapter <= 13: return "Ch11-13" elif chapter >= 15 and chapter <= 16: return "Ch15-16" elif chapter >= 18 and chapter <= 21: return "Ch18-21" else: return False def start_session(): payload = {"username": "demo0", "passwd": "malware"} headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Content-Type": "application/x-www-form-urlencoded", } session.post(url, data=payload, headers=headers) def get_file_path(level, group): payload = { "setname": group, } headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Content-Type": "application/x-www-form-urlencoded", } response = session.post(url + "/download", data=payload, headers=headers) with tempfile.TemporaryDirectory() as temp_dir: with io.BytesIO(response.content) as zip_file: with zipfile.ZipFile(zip_file) as z: z.extractall(temp_dir) extracted_files = os.listdir(temp_dir) # print(f"Extracted files: {extracted_files}") for file_name in extracted_files: if file_name == level: temp_file_path = os.path.join(temp_dir, file_name) saved_file_path = os.path.join("./temp", file_name) os.rename(temp_file_path, saved_file_path) return saved_file_path return False def execute(path: str): print("path ", path) loader = FileSystemBlobLoader(path, show_progress=True) prompt = PromptTemplate.from_template( "Describe the attached binary file: {file_content}" ) llm = ChatOpenAI(model_name="gpt-4-turbo", temperature=0) blobs: Iterable[Blob] = loader.yield_blobs() for b in blobs: chain = SimpleSequentialChain( prompt=prompt, llm=llm, ) # print('b ',b) result = chain.run(file_content=b) return result print( "This program will attempt to automatically solve CS 492/592 Malware Reverse Engineering CTF levels." ) print("Enter the name of the level (example: Ch03DynA_Ltrace)") while True: try: level: str = input("name of binary>> ") if level: chapter = level[2:4] if chapter.strip().isdigit(): group = get_group(int(chapter)) if group: start_time = time() start_session() if os.path.isfile(os.path.join("./temp", level)): print("found") file_path = os.path.join("./temp", level) else: file_path = get_file_path(level, group) if file_path: results = execute(file_path) print(results) else: raise Exception("Not a valid file") else: raise Exception("Not a valid chapter of a possible file") else: raise Exception("Bad input format (example: Ch03DynA_Ltrace)") else: break except Exception as e: print(e) break