From 593bdaa1ffeeee5f2784d6b0172245eb450bccdf Mon Sep 17 00:00:00 2001 From: Elliot Chernofsky Date: Sat, 24 Aug 2024 12:20:37 -0400 Subject: [PATCH] Add internet detector utility Co-authored-by: Ana Martinez Gomez --- .../internet_detector.vm.nuspec | 13 + .../tools/chocolateyinstall.ps1 | 29 +++ .../tools/chocolateyuninstall.ps1 | 7 + .../tools/internet_detector.pyw | 239 ++++++++++++++++++ 4 files changed, 288 insertions(+) create mode 100644 packages/internet_detector.vm/internet_detector.vm.nuspec create mode 100644 packages/internet_detector.vm/tools/chocolateyinstall.ps1 create mode 100644 packages/internet_detector.vm/tools/chocolateyuninstall.ps1 create mode 100644 packages/internet_detector.vm/tools/internet_detector.pyw diff --git a/packages/internet_detector.vm/internet_detector.vm.nuspec b/packages/internet_detector.vm/internet_detector.vm.nuspec new file mode 100644 index 000000000..16248794c --- /dev/null +++ b/packages/internet_detector.vm/internet_detector.vm.nuspec @@ -0,0 +1,13 @@ + + + + internet_detector.vm + 1.0.0 + Elliot Chernofsky and Ana Martinez Gomez + Tool to detect if internet connectivity exists + + + + + + diff --git a/packages/internet_detector.vm/tools/chocolateyinstall.ps1 b/packages/internet_detector.vm/tools/chocolateyinstall.ps1 new file mode 100644 index 000000000..6a4dd0cc4 --- /dev/null +++ b/packages/internet_detector.vm/tools/chocolateyinstall.ps1 @@ -0,0 +1,29 @@ +$ErrorActionPreference = 'Stop' +Import-Module vm.common -Force -DisableNameChecking + +$toolName = 'internet_detector' +$category = 'Utilities' + +# Install dependency for windows api +VM-Pip-Install "pywin32" + +$toolDir = "${Env:RAW_TOOLS_DIR}\$toolName" +New-Item -Path $toolDir -ItemType Directory +VM-Assert-Path $toolDir + +# Download the script +(New-Object net.webclient).DownloadFile('https://raw.githubusercontent.com/mandiant/VM-Packages/main/packages/internet_detector.vm/tools/internet_detector.pyw', $toolDir) + +$executablePath = (Get-Command pythonw).Source +$filePath = Join-Path $toolDir "$toolName.pyw" + +VM-Install-Shortcut $toolName $category $executablePath -arguments $filePath + +# Define the action to be performed +$action = New-ScheduledTaskAction -Execute $executablePath -Argument "$filePath" + +# Define the trigger to run every 2 minutes +$trigger = New-ScheduledTaskTrigger -Once -At (Get-Date) -RepetitionInterval (New-TimeSpan -Minutes 2) + +# Create the scheduled task +Register-ScheduledTask -Action $action -Trigger $trigger -TaskName 'Internet Detector' -Force diff --git a/packages/internet_detector.vm/tools/chocolateyuninstall.ps1 b/packages/internet_detector.vm/tools/chocolateyuninstall.ps1 new file mode 100644 index 000000000..5aa4127d5 --- /dev/null +++ b/packages/internet_detector.vm/tools/chocolateyuninstall.ps1 @@ -0,0 +1,7 @@ +$ErrorActionPreference = 'Continue' +Import-Module vm.common -Force -DisableNameChecking + +$toolName = 'internet_detector' +$category = 'Utilities' + +VM-Uninstall $toolName $category diff --git a/packages/internet_detector.vm/tools/internet_detector.pyw b/packages/internet_detector.vm/tools/internet_detector.pyw new file mode 100644 index 000000000..e0d28dc09 --- /dev/null +++ b/packages/internet_detector.vm/tools/internet_detector.pyw @@ -0,0 +1,239 @@ +import threading +import requests +import win32api +import win32gui +import win32con +import urllib3 +import hashlib +import winreg +import signal +import ctypes +import time +import os +import re + + +# Define constants +CHECK_INTERVAL = 2 # Seconds +CONNECT_TEST_URL_AND_RESPONSES = { + "https://www.msftconnecttest.com/connecttest.txt": "Microsoft Connect Test", # HTTPS Test #1 + "http://www.google.com": 'Google', # HTTP Test + "https://www.wikipedia.com": 'Wikipedia', # HTTPS Test #2 + "https://www.youtube.com": 'YouTube' # HTTPS Test #3 + } +SPI_SETDESKWALLPAPER = 20 +SPIF_UPDATEINIFILE = 0x01 +SPIF_SENDWININICHANGE = 0x02 +COLOR_DESKTOP = 1 +ICON_INDICATOR_ON = os.path.join(os.environ.get('VM_COMMON_DIR'), "indicator_on.ico") +ICON_INDICATOR_OFF = os.path.join(os.environ.get('VM_COMMON_DIR'), "indicator_off.ico") +DEFAULT_BACKGROUND = os.path.join(os.environ.get('VM_COMMON_DIR'), "background.png") +INTERNET_BACKGROUND = os.path.join(os.environ.get('VM_COMMON_DIR'),"background-internet.png") + +# Global variables +tray_icon = None +stop_event = threading.Event() # To signal the background thread to stop +hwnd = None # We'll assign the window handle here later +check_thread = None +tray_icon_thread = None +# Win32 API icon handles +hicon_indicator_off = None +hicon_indicator_on = None + +def signal_handler(sig, frame): + global check_thread, tray_icon_thread, tray_icon + print("Ctrl+C detected. Exiting...") + stop_event.set() # Signal the background thread to stop + if check_thread: + check_thread.join() + if tray_icon_thread: + tray_icon_thread.join() + if tray_icon: + del tray_icon + exit(0) + +def load_icon(icon_path): + try: + return win32gui.LoadImage(None, icon_path, win32con.IMAGE_ICON, 0, 0, win32con.LR_LOADFROMFILE) + except Exception as e: + print(f"Error loading indicator icon: {e}") + return None + +class SysTrayIcon: + def __init__(self, hwnd, icon, tooltip): + self.hwnd = hwnd + self.icon = icon + self.tooltip = tooltip + # System tray icon data structure + self.nid = (self.hwnd, 0, win32gui.NIF_ICON | win32gui.NIF_MESSAGE | win32gui.NIF_TIP, + win32con.WM_USER + 20, self.icon, self.tooltip) + # Add the icon to the system tray + win32gui.Shell_NotifyIcon(win32gui.NIM_ADD, self.nid) + + def set_tooltip(self, new_tooltip): + self.tooltip = new_tooltip + self.nid = (self.hwnd, 0, win32gui.NIF_ICON | win32gui.NIF_MESSAGE | win32gui.NIF_TIP, + win32con.WM_USER + 20, self.icon, self.tooltip) + win32gui.Shell_NotifyIcon(win32gui.NIM_MODIFY, self.nid) + + def set_icon(self, icon): + self.icon = icon + self.nid = (self.hwnd, 0, win32gui.NIF_ICON | win32gui.NIF_MESSAGE | win32gui.NIF_TIP, + win32con.WM_USER + 20, self.icon, self.tooltip) + win32gui.Shell_NotifyIcon(win32gui.NIM_MODIFY, self.nid) + + def __del__(self): + # Remove the icon from the system tray when the object is destroyed + win32gui.Shell_NotifyIcon(win32gui.NIM_DELETE, self.nid) + +# Attempt to extract a known good value in response. +def extract_title(data): + match = re.search(r'(.*?)</title', data) + if match: + return match.group(1) + else: + return None + +def check_internet(): + for url, expected_response in CONNECT_TEST_URL_AND_RESPONSES.items(): + try: + # Perform internet connectivity tests + response = requests.get(url, timeout=5, verify=False) + if expected_response in (extract_title(response.text) or response.text): + print("Internet connectivity detected via URL: {}".format(url)) + return True + except: + pass + return False + +def check_internet_and_update_tray_icon(): + global tray_icon, hicon_indicator_off, hicon_indicator_on + if check_internet(): + tray_icon.set_icon(hicon_indicator_on) + tray_icon.set_tooltip("Internet Connection: Detected") + # Set the background to internet connection background + if get_wallpaper_path() != INTERNET_BACKGROUND: # Checked so program isn't continuously setting the wallpaper + set_wallpaper(INTERNET_BACKGROUND) + else: + tray_icon.set_icon(hicon_indicator_off) + tray_icon.set_tooltip("Internet Connection: Not Detected") + # Reset background when internet is not detected + if get_wallpaper_path() != DEFAULT_BACKGROUND: # Checked so program isn't continuously setting the wallpaper + set_wallpaper(DEFAULT_BACKGROUND) + +def check_internet_loop(): + while not stop_event.is_set(): + check_internet_and_update_tray_icon() + time.sleep(CHECK_INTERVAL) + +def tray_icon_loop(): + global hwnd, tray_icon, hicon_indicator_off, hicon_indicator_on, stop_event + # Load icons + hicon_indicator_off = load_icon(ICON_INDICATOR_OFF) + hicon_indicator_on = load_icon(ICON_INDICATOR_ON) + + # Wait for hwnd to be initialized + while hwnd is None: + time.sleep(0.1) + + if hicon_indicator_off is None or hicon_indicator_on is None: + print("Error: Failed to load icons. Exiting TrayIconThread.") + return + + tray_icon = SysTrayIcon(hwnd, hicon_indicator_off, "Internet Detector") + + while not stop_event.is_set(): + msg = win32gui.PeekMessage(hwnd, 0, 0, 0) + if msg and len(msg) == 6: + win32gui.TranslateMessage(msg) + win32gui.DispatchMessage(msg) + time.sleep(0.1) + +def enable_transparency_effects(): + try: + key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, + r"Software\Microsoft\Windows\CurrentVersion\Themes\Personalize", + 0, winreg.KEY_ALL_ACCESS) + + winreg.SetValueEx(key, "EnableTransparency", 0, winreg.REG_DWORD, 1) + winreg.CloseKey(key) + except WindowsError as e: + print(f"Error accessing or modifying registry: {e}") + +def get_wallpaper_path(): + """Attempts to retrieve the path to the current wallpaper image.""" + # Try to get the path from the registry (for wallpapers set through Windows settings) + try: + key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Control Panel\Desktop", 0, winreg.KEY_READ) + value, _ = winreg.QueryValueEx(key, "Wallpaper") + winreg.CloseKey(key) + if value: + return value + except WindowsError: + pass + + # Check for cached wallpaper files (if the above fails) + cached_files_dir = os.path.join(os.getenv("APPDATA"), r"Microsoft\Windows\Themes\CachedFiles") + transcoded_wallpaper_path = os.path.join(os.getenv("APPDATA"), r"Microsoft\Windows\Themes\TranscodedWallpaper") + + for file in os.listdir(cached_files_dir): + if file.endswith(('.jpg', '.jpeg', '.bmp', '.png')): + return os.path.join(cached_files_dir, file) + + if os.path.exists(transcoded_wallpaper_path): + return transcoded_wallpaper_path + + # If all else fails, return None + return None + +def set_wallpaper(image_path): + """Sets the desktop wallpaper to the image at the specified path.""" + print("Setting wallpaper to: {}".format(image_path)) + result = ctypes.windll.user32.SystemParametersInfoW( + SPI_SETDESKWALLPAPER, 0, image_path, SPIF_UPDATEINIFILE | SPIF_SENDWININICHANGE + ) + if not result: + print("Error setting wallpaper. Make sure the image path is correct.") + +def main_loop(): + global stop_event, check_thread, tray_icon_thread, tray_icon + # Create and start the threads + tray_icon_thread = threading.Thread(target=tray_icon_loop) + check_thread = threading.Thread(target=check_internet_loop) + + tray_icon_thread.start() + # Wait for the tray icon to finish initializing + while tray_icon is None: + time.sleep(0.1) + + check_thread.start() + + while not stop_event.is_set(): + time.sleep(1) + + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signal_handler) + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + enable_transparency_effects() + + # Create a hidden window to receive messages (required for system tray icons) + def wndProc(hwnd, msg, wparam, lparam): + if lparam == win32con.WM_LBUTTONDBLCLK: + print("Left button double clicked") + elif msg == win32con.WM_COMMAND: + if wparam == 1023: # Example menu item ID + print("Exit selected") + win32gui.DestroyWindow(hwnd) + return win32gui.DefWindowProc(hwnd, msg, wparam, lparam) + + wc = win32gui.WNDCLASS() + hinst = wc.hInstance = win32api.GetModuleHandle(None) + wc.lpszClassName = "Internet Detector" + wc.lpfnWndProc = wndProc + classAtom = win32gui.RegisterClass(wc) + hwnd = win32gui.CreateWindow(classAtom, "Internet Detector", 0, 0, 0, 0, 0, 0, 0, hinst, None) + + print("Current wallpaper: {}".format(get_wallpaper_path())) + + main_loop()