Skip to content

Commit

Permalink
Moves all _logs to global log
Browse files Browse the repository at this point in the history
  • Loading branch information
Neverbolt committed Aug 9, 2024
1 parent f3c0642 commit 17a1d57
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 104 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,22 @@ class MinimalLinuxPrivesc(Agent):
def perform_round(self, turn: int) -> bool:
got_root: bool = False

with self._log.console.status("[bold green]Asking LLM for a new command..."):
with self.log.console.status("[bold green]Asking LLM for a new command..."):
# get as much history as fits into the target context size
history = self._sliding_history.get_history(self.llm.context_size - llm_util.SAFETY_MARGIN - self._template_size)

# get the next command from the LLM
answer = self.llm.get_response(template_next_cmd, capabilities=self.get_capability_block(), history=history, conn=self.conn)
cmd = llm_util.cmd_output_fixer(answer.result)

with self._log.console.status("[bold green]Executing that command..."):
self._log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
with self.log.console.status("[bold green]Executing that command..."):
self.log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
result, got_root = self.get_capability(cmd.split(" ", 1)[0])(cmd)

# log and output the command and its result
self._log.log_db.add_log_query(self._log.run_id, turn, cmd, result, answer)
self.log.add_log_query(turn, cmd, result, answer)
self._sliding_history.add_command(cmd, result)
self._log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))
self.log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))

# if we got root, we can stop the loop
return got_root
Expand Down
13 changes: 7 additions & 6 deletions src/hackingBuddyGPT/usecases/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@

@dataclass
class Agent(ABC):
log: Logger = None

_capabilities: Dict[str, Capability] = field(default_factory=dict)
_default_capability: Capability = None
_log: Logger = None

llm: OpenAIConnection = None

Expand Down Expand Up @@ -76,7 +77,7 @@ def set_template(self, template:str):
def perform_round(self, turn:int) -> bool:
got_root : bool = False

with self._log.console.status("[bold green]Asking LLM for a new command..."):
with self.log.console.status("[bold green]Asking LLM for a new command..."):
# TODO output/log state
options = self._state.to_template()
options.update({
Expand All @@ -87,16 +88,16 @@ def perform_round(self, turn:int) -> bool:
answer = self.llm.get_response(self._template, **options)
cmd = llm_util.cmd_output_fixer(answer.result)

with self._log.console.status("[bold green]Executing that command..."):
self._log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
with self.log.console.status("[bold green]Executing that command..."):
self.log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
capability = self.get_capability(cmd.split(" ", 1)[0])
result, got_root = capability(cmd)

# log and output the command and its result
self._log.add_log_query(turn, cmd, result, answer)
self.log.add_log_query(turn, cmd, result, answer)
self._state.update(capability, cmd, result)
# TODO output/log new state
self._log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))
self.log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))

# if we got root, we can stop the loop
return got_root
37 changes: 18 additions & 19 deletions src/hackingBuddyGPT/usecases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
from hackingBuddyGPT.utils.db_storage.db_storage import DbStorage


@configurable("logger", "Logger")
@dataclass
class Logger:
class RawLogger:
log_db: DbStorage
console: Console
model: str = ""
tag: str = ""
configuration: str = ""

run_id: int = field(init=False, default=None)

def __post_init__(self):
self.run_id = self.log_db.create_new_run(self.model, self.tag, self.configuration)
def start_run(self, name: str, configuration: str):
if self.run_id is not None:
raise ValueError("Run already started")
self.run_id = self.log_db.create_new_run(name, self.tag, configuration)

def add_log_query(self, turn: int, command: str, result: str, answer: LLMResult):
self.log_db.add_log_query(self.run_id, turn, command, result, answer)
Expand Down Expand Up @@ -53,6 +54,9 @@ def status_message(self, message: str):
self.log_db.add_log_message(self.run_id, "status", message, 0, 0, 0)


Logger = Global(Transparent(RawLogger))


@dataclass
class UseCase(abc.ABC):
"""
Expand All @@ -65,19 +69,15 @@ class UseCase(abc.ABC):
so that they can be automatically discovered and run from the command line.
"""

log_db: DbStorage
console: Console
tag: str = ""

_log: Logger = None
log: Logger

def init(self, configuration):
"""
The init method is called before the run method. It is used to initialize the UseCase, and can be used to
perform any dynamic setup that is needed before the run method is called. One of the most common use cases is
setting up the llm capabilities from the tools that were injected.
"""
self._log = Logger(self.log_db, self.console, self.get_name(), self.tag, self.serialize_configuration(configuration))
self.log.start_run(self.get_name(), self.serialize_configuration(configuration))

def serialize_configuration(self, configuration) -> str:
return json.dumps(configuration)
Expand Down Expand Up @@ -123,27 +123,27 @@ def run(self):
turn = 1
try:
while turn <= self.max_turns and not self._got_root:
self._log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}")
self.log.console.log(f"[yellow]Starting turn {turn} of {self.max_turns}")

self._got_root = self.perform_round(turn)

# finish turn and commit logs to storage
self._log.log_db.commit()
self.log.log_db.commit()
turn += 1

self.after_run()

# write the final result to the database and console
if self._got_root:
self._log.run_was_success()
self._log.console.print(Panel("[bold green]Got Root!", title="Run finished"))
self.log.run_was_success()
self.log.console.print(Panel("[bold green]Got Root!", title="Run finished"))
else:
self._log.run_was_failure("maximum turn number reached")
self._log.console.print(Panel("[green]maximum turn number reached", title="Run finished"))
self.log.run_was_failure("maximum turn number reached")
self.log.console.print(Panel("[green]maximum turn number reached", title="Run finished"))

return self._got_root
except Exception as e:
self._log.run_was_failure(f"exception occurred: {e}")
self.log.run_was_failure(f"exception occurred: {e}")
raise


Expand Down Expand Up @@ -192,7 +192,6 @@ class AutonomousAgentUseCase(AutonomousUseCase):

def init(self, configuration):
super().init(configuration)
self.agent._log = self._log
self.agent.init()

def get_name(self) -> str:
Expand Down
10 changes: 5 additions & 5 deletions src/hackingBuddyGPT/usecases/minimal/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ def init(self):
def perform_round(self, turn: int) -> bool:
got_root: bool = False

with self._log.console.status("[bold green]Asking LLM for a new command..."):
with self.log.console.status("[bold green]Asking LLM for a new command..."):
# get as much history as fits into the target context size
history = self._sliding_history.get_history(self.llm.context_size - llm_util.SAFETY_MARGIN - self._template_size)

# get the next command from the LLM
answer = self.llm.get_response(template_next_cmd, capabilities=self.get_capability_block(), history=history, conn=self.conn)
cmd = llm_util.cmd_output_fixer(answer.result)

with self._log.console.status("[bold green]Executing that command..."):
self._log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
with self.log.console.status("[bold green]Executing that command..."):
self.log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
result, got_root = self.get_capability(cmd.split(" ", 1)[0])(cmd)

# log and output the command and its result
self._log.add_log_query(turn, cmd, result, answer)
self.log.add_log_query(turn, cmd, result, answer)
self._sliding_history.add_command(cmd, result)
self._log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))
self.log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))

# if we got root, we can stop the loop
return got_root
Expand Down
26 changes: 13 additions & 13 deletions src/hackingBuddyGPT/usecases/privesc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def init(self):

def before_run(self):
if self.hint != "":
self._log.console.print(f"[bold green]Using the following hint: '{self.hint}'")
self.log.console.print(f"[bold green]Using the following hint: '{self.hint}'")

if self.disable_history is False:
self._sliding_history = SlidingCliHistory(self.llm)
Expand All @@ -57,48 +57,48 @@ def before_run(self):
def perform_round(self, turn: int) -> bool:
got_root: bool = False

with self._log.console.status("[bold green]Asking LLM for a new command..."):
with self.log.console.status("[bold green]Asking LLM for a new command..."):
answer = self.get_next_command()
cmd = answer.result

with self._log.console.status("[bold green]Executing that command..."):
self._log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
with self.log.console.status("[bold green]Executing that command..."):
self.log.console.print(Panel(answer.result, title="[bold cyan]Got command from LLM:"))
_capability_descriptions, parser = capabilities_to_simple_text_handler(self._capabilities, default_capability=self._default_capability)
success, *output = parser(cmd)
if not success:
self._log.console.print(Panel(output[0], title="[bold red]Error parsing command:"))
self.log.console.print(Panel(output[0], title="[bold red]Error parsing command:"))
return False

assert(len(output) == 1)
capability, cmd, (result, got_root) = output[0]

# log and output the command and its result
self._log.add_log_query(turn, cmd, result, answer)
self.log.add_log_query(turn, cmd, result, answer)
if self._sliding_history:
self._sliding_history.add_command(cmd, result)

self._log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))
self.log.console.print(Panel(result, title=f"[bold cyan]{cmd}"))

# analyze the result..
if self.enable_explanation:
with self._log.console.status("[bold green]Analyze its result..."):
with self.log.console.status("[bold green]Analyze its result..."):
answer = self.analyze_result(cmd, result)
self._log.add_log_analyze_response(turn, cmd, answer.result, answer)
self.log.add_log_analyze_response(turn, cmd, answer.result, answer)

# .. and let our local model update its state
if self.enable_update_state:
# this must happen before the table output as we might include the
# status processing time in the table..
with self._log.console.status("[bold green]Updating fact list.."):
with self.log.console.status("[bold green]Updating fact list.."):
state = self.update_state(cmd, result)
self._log.add_log_update_state(turn, "", state.result, state)
self.log.add_log_update_state(turn, "", state.result, state)

# Output Round Data..
self._log.console.print(ui.get_history_table(self.enable_explanation, self.enable_update_state, self._log.run_id, self._log.log_db, turn))
self.log.console.print(ui.get_history_table(self.enable_explanation, self.enable_update_state, self.log.run_id, self.log.log_db, turn))

# .. and output the updated state
if self.enable_update_state:
self._log.console.print(Panel(self._state, title="What does the LLM Know about the system?"))
self.log.console.print(Panel(self._state, title="What does the LLM Know about the system?"))

# if we got root, we can stop the loop
return got_root
Expand Down
28 changes: 15 additions & 13 deletions src/hackingBuddyGPT/usecases/privesc/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def read_hint(self):
if self.agent.conn.hostname in hints:
return hints[self.agent.conn.hostname]
except FileNotFoundError:
self._log.console.print("[yellow]Hint file not found")
self.log.console.print("[yellow]Hint file not found")
except Exception as e:
self._log.console.print("[yellow]Hint file could not loaded:", str(e))
self.log.console.print("[yellow]Hint file could not loaded:", str(e))
return ""


Expand All @@ -61,23 +61,25 @@ class LinuxPrivescWithLSEUseCase(UseCase):

_got_root: bool = False

# use either an use-case or an agent to perform the privesc
# use either a use-case or an agent to perform the privesc
use_use_case: bool = False
_configuration: any = None

def init(self, configuration=None):
super().init(configuration)
self._configuration = configuration

# simple helper that uses lse.sh to get hints from the system
def call_lse_against_host(self):
self._log.console.print("[green]performing initial enumeration with lse.sh")
self.log.console.print("[green]performing initial enumeration with lse.sh")

run_cmd = "wget -q 'https://github.com/diego-treitos/linux-smart-enumeration/releases/latest/download/lse.sh' -O lse.sh;chmod 700 lse.sh; ./lse.sh -c -i -l 0 | grep -v 'nope$' | grep -v 'skip$'"

result, _ = SSHRunCommand(conn=self.conn, timeout=120)(run_cmd)

self.console.print("[yellow]got the output: " + result)
self.log.console.print("[yellow]got the output: " + result)
cmd = self.llm.get_response(template_lse, lse_output=result, number=3)
self.console.print("[yellow]got the cmd: " + cmd.result)
self.log.console.print("[yellow]got the cmd: " + cmd.result)

return [x for x in cmd.result.splitlines() if x.strip()]

Expand All @@ -98,7 +100,7 @@ def run(self):
result = self.run_using_agent(hint, turns_per_hint)

if result is True:
self.console.print("[green]Got root!")
self.log.console.print("[green]Got root!")
return True

def run_using_usecases(self, hint, turns_per_hint):
Expand All @@ -110,13 +112,13 @@ def run_using_usecases(self, hint, turns_per_hint):
enable_update_state = self.enable_update_state,
disable_history = self.disable_history,
llm = self.llm,
hint = hint
hint = hint,
log = self.log,
),
max_turns = turns_per_hint,
log_db = self.log_db,
console = self.console
log = self.log,
)
linux_privesc.init()
linux_privesc.init(self._configuration)
return linux_privesc.run()

def run_using_agent(self, hint, turns_per_hint):
Expand All @@ -129,15 +131,15 @@ def run_using_agent(self, hint, turns_per_hint):
enable_update_state = self.enable_update_state,
disable_history = self.disable_history
)
agent._log = self._log
agent.log = self.log
agent.init()

# perform the privilege escalation
agent.before_run()
turn = 1
got_root = False
while turn <= turns_per_hint and not got_root:
self._log.console.log(f"[yellow]Starting turn {turn} of {turns_per_hint}")
self.log.console.log(f"[yellow]Starting turn {turn} of {turns_per_hint}")

if agent.perform_round(turn) is True:
got_root = True
Expand Down
12 changes: 6 additions & 6 deletions src/hackingBuddyGPT/usecases/web/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ def init(self):
}

def all_flags_found(self):
self._log.console.print(Panel("All flags found! Congratulations!", title="system"))
self.log.console.print(Panel("All flags found! Congratulations!", title="system"))
self._all_flags_found = True

def perform_round(self, turn: int):
with self._log.console.status("[bold green]Asking LLM for a new command..."):
with self.log.console.status("[bold green]Asking LLM for a new command..."):
prompt = self._prompt_history # TODO: in the future, this should do some context truncation

tic = time.perf_counter()
Expand All @@ -66,17 +66,17 @@ def perform_round(self, turn: int):
message = completion.choices[0].message
tool_call_id = message.tool_calls[0].id
command = pydantic_core.to_json(response).decode()
self._log.console.print(Panel(command, title="assistant"))
self.log.console.print(Panel(command, title="assistant"))
self._prompt_history.append(message)

answer = LLMResult(completion.choices[0].message.content, str(prompt), completion.choices[0].message.content, toc-tic, completion.usage.prompt_tokens, completion.usage.completion_tokens)

with self._log.console.status("[bold green]Executing that command..."):
with self.log.console.status("[bold green]Executing that command..."):
result = response.execute()
self._log.console.print(Panel(result, title="tool"))
self.log.console.print(Panel(result, title="tool"))
self._prompt_history.append(tool_message(result, tool_call_id))

self._log.add_log_query(turn, command, result, answer)
self.log.add_log_query(turn, command, result, answer)
return self._all_flags_found


Expand Down
Loading

0 comments on commit 17a1d57

Please sign in to comment.