9 Commits

Author SHA1 Message Date
Novattz
b77cea8d0b refactor: complete UI overhaul and add SmokeAPI support 2025-01-22 13:43:50 +01:00
Tickbase
fbf04d0020 Update README.md 2025-01-08 05:31:27 +01:00
Tickbase
d26e1d154e Update and rename bug_report.md to report.md 2025-01-08 05:04:10 +01:00
Tickbase
0a74ba0f4c Update README.md 2024-12-04 07:42:26 +01:00
Tickbase
e000afacbc Update README.md 2024-12-04 01:23:03 +01:00
Tickbase
25c1c46e5d Fix my dumb ahh 2024-11-30 15:13:10 +01:00
Tickbase
b9d418805f Update dlc_fetcher.py 2024-11-30 11:40:02 +01:00
Tickbase
c99d747642 Update dlc_fetcher.py
Added back debug prints which got removed by accident
2024-11-30 11:19:49 +01:00
Tickbase
2a94f19719 Update README.md 2024-11-29 22:16:26 +01:00
7 changed files with 1115 additions and 395 deletions

View File

@@ -1,8 +1,6 @@
---
name: Bug report
name: Report
about: Create a report to help us improve
title: "[BUG]"
labels: bug
assignees: Novattz
---

View File

@@ -1,40 +1,44 @@
# Steam DLC Fetcher and installer for Linux
- Python script designed for linux to automate fetching of DLC id's for steam games and the installation of creamlinux automatically. [Demo/Tutorial](https://www.youtube.com/watch?v=Y1E15rUsdDw)
- A user-friendly tool for managing DLC for Steam games on Linux systems.
[Demo/Tutorial](https://www.youtube.com/watch?v=Y1E15rUsdDw) - [OUTDATED]
### Features
- Automatically fetches and lists DLC's for selected steam game(s) installed on the computer.
- Automatically installs creamlinux and its components into selected steam games, excluding and handling specific config files.
- Provides a simple cli to navigate and operate the entire process.
- Automatic Steam library detection
- Support for Linux and Proton
- Automatic updates (Soon)
- DLC detection and installation
## Usage
### Prerequisites
- `python 3.x`
- `requests` library
- `zipfile` library
- Python 3.7 or higher
- requests
- rich
- argparse
- json
### Installation
- Clone the repo or download the script.
- Navigate to the directory containing the script.
- Run the script using python.
#### OR
Use this one-line shell script.
- Run the script using python:
```bash
git clone https://github.com/Novattz/creamlinux-installer;cd creamlinux-installer;python dlc_fetcher.py
python main.py
```
### Basic Usage
- `--manual <path>`: Specify steam library path manually
```bash
python main.py --manual "/path/to/steamapps"
```
- `--debug`: Enable debug logging
```bash
python main.py --debug
```
## TODO
- [ ] Cross reference dlc files and dlc id's. Incase dlc id's and dlc files differ in terms of quantity it will notify the user.
- [ ] Possibly add functionality to search for dlc files/automatically installing them.
- [ ] Add the possibility to install cream/smokeapi for games running proton.
- [ ] Check if the game already has dlc files installed
- [ ] Gui?
- [ ] Add checker for configs already applied to games. (i.e script will check for new dlc id's for already applied games.)
- [ ] Add a way to check if a game blocks LD_PRELOAD.
### Issues?
- Open a issue and attach all relevant errors/logs.
# Credits
- [All credits for creamlinux go to its original author and contributors.](https://github.com/anticitizn/creamlinux)
## Credits
- [Creamlinux](https://github.com/anticitizn/creamlinux) by anticitizn
- [SmokeAPI](https://github.com/acidicoala/SmokeAPI) by acidicoala

View File

@@ -1,368 +0,0 @@
import os
import re
import requests
import zipfile
import time
import stat
import subprocess
from collections import defaultdict
import logging
import argparse
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
LOG_FILE = 'script.log'
DEBUG_FILE = 'debug_script.log'
TIMEOUT = 180 # Timeout in seconds (3 minutes)
def setup_logging(debug):
log_format = '%(asctime)s [%(levelname)s] %(message)s'
date_format = '%m-%d %H:%M:%S'
if debug:
logging.basicConfig(filename=DEBUG_FILE, level=logging.DEBUG, format=log_format, datefmt=date_format)
else:
logging.basicConfig(filename=LOG_FILE, level=logging.ERROR, format=log_format, datefmt=date_format)
def clear_screen():
os.system('cls' if os.name == 'nt' else 'clear')
def log_error(message):
logging.error(message)
print(message)
def log_debug(message):
logging.debug(message)
def read_steam_registry():
registry_path = os.path.expanduser('~/.steam/registry.vdf')
if os.path.exists(registry_path):
log_debug(f"Found Steam registry file: {registry_path}")
with open(registry_path, 'r') as f:
content = f.read()
install_path = re.search(r'"InstallPath"\s*"([^"]+)"', content)
if install_path:
return install_path.group(1)
return None
def fetch_latest_version():
try:
response = requests.get("https://api.github.com/repos/Novattz/creamlinux-installer/releases/latest")
data = response.json()
return data['tag_name']
except requests.exceptions.RequestException as e:
log_error(f"Failed to fetch latest version: {str(e)}")
return "Unknown"
def show_header(app_version, debug_mode):
clear_screen()
cyan = '\033[96m'
red = '\033[91m'
reset = '\033[0m'
print(f"{cyan}")
print(r"""
_ _ _ _ _ ____ _____ _ ________ _____
| | | | \ | | | / __ \ / ____| |/ / ____| __ \
| | | | \| | | | | | | | | ' /| |__ | |__) |
| | | | . ` | | | | | | | | < | __| | _ /
| |__| | |\ | |___| |__| | |____| . \| |____| | \ \
\____/|_| \_|______\____/ \_____|_|\_\______|_| \_\
""")
print(f"""
> Made by Tickbase
> GitHub: https://github.com/Novattz/creamlinux-installer
> Version: {app_version}
{reset}
""")
if debug_mode:
print(f"{red} [Running in DEBUG mode]{reset}\n")
print()
app_version = fetch_latest_version()
#app_version = "TESTING / LETS NOT OVERLOAD GITHUB FOR NO REASON"
def parse_vdf(file_path):
library_paths = []
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
paths = re.findall(r'"path"\s*"(.*?)"', content, re.IGNORECASE)
library_paths.extend([os.path.normpath(path) for path in paths])
except Exception as e:
log_error(f"Failed to read {file_path}: {str(e)}")
return library_paths
def find_steam_binary():
try:
result = subprocess.run(['which', 'steam'], stdout=subprocess.PIPE)
steam_path = result.stdout.decode('utf-8').strip()
if steam_path:
return os.path.dirname(steam_path)
except Exception as e:
log_error(f"Failed to locate steam binary: {str(e)}")
return None
def find_steam_library_folders(manual_path=""):
search_list = [
# Default
os.path.expanduser('~/.steam/steam'),
os.path.expanduser('~/.local/share/Steam'),
# Steam Deck
os.path.expanduser('/home/deck/.steam/steam'),
os.path.expanduser('/home/deck/.local/share/Steam'),
# Others
'/mnt/Jogos/Steam',
'/run/media/mmcblk0p1',
# Flatpak
os.path.expanduser('~/.var/app/com.valvesoftware.Steam/.local/share/Steam'),
os.path.expanduser('~/.var/app/com.valvesoftware.Steam/data/Steam/steamapps/common')
]
library_folders = []
try:
if manual_path:
log_debug(f"Manual game path set to \"{manual_path}\" skipping path lookup")
library_folders.append(manual_path)
else:
steam_binary_path = find_steam_binary()
if steam_binary_path and steam_binary_path not in search_list:
search_list.append(steam_binary_path)
steam_install_path = read_steam_registry()
if steam_install_path and steam_install_path not in search_list:
search_list.append(steam_install_path)
log_debug(f"Paths that will be searched: {search_list}")
for search_path in search_list:
if os.path.exists(search_path):
log_debug(f"Scanning path: {search_path}")
steamapps_path = str(os.path.normpath(f"{search_path}/steamapps"))
if os.path.exists(steamapps_path):
library_folders.append(steamapps_path)
log_debug(f"Found steamapps folder: {steamapps_path}")
vdf_path = os.path.join(steamapps_path, 'libraryfolders.vdf')
if os.path.exists(vdf_path):
log_debug(f"Found libraryfolders.vdf: {vdf_path}")
additional_paths = parse_vdf(vdf_path)
for path in additional_paths:
new_steamapps_path = os.path.join(path, 'steamapps')
if os.path.exists(new_steamapps_path):
library_folders.append(new_steamapps_path)
log_debug(f"Added additional steamapps folder: {new_steamapps_path}")
if not library_folders:
raise FileNotFoundError("No Steam library folders found.")
except Exception as e:
log_error(f"Error finding Steam library folders: {e}")
log_error("Scanned paths:")
for path in search_list:
log_error(f" - {path}")
return library_folders
def process_acf_file(folder, item):
try:
app_id, game_name, install_dir = parse_acf(os.path.join(folder, item))
if app_id and game_name:
install_path = os.path.join(folder, 'common', install_dir)
if os.path.exists(install_path):
return app_id, game_name, install_path
except Exception as e:
log_error(f"Error processing {item}: {e}")
return None
def find_steam_apps(library_folders):
acf_pattern = re.compile(r'^appmanifest_(\d+)\.acf$')
games = {}
with ThreadPoolExecutor() as executor:
futures = []
for folder in library_folders:
if os.path.exists(folder):
for item in os.listdir(folder):
if acf_pattern.match(item):
futures.append(
executor.submit(process_acf_file, folder, item)
)
for future in futures:
result = future.result()
if result:
app_id, game_name, install_path = result
cream_installed = 'Cream installed' if os.path.exists(os.path.join(install_path, 'cream.sh')) else ''
games[app_id] = (game_name, cream_installed, install_path)
return games
def parse_acf(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
data = file.read()
app_id = re.search(r'"appid"\s+"(\d+)"', data)
name = re.search(r'"name"\s+"([^"]+)"', data)
install_dir = re.search(r'"installdir"\s+"([^"]+)"', data)
return app_id.group(1), name.group(1), install_dir.group(1)
except Exception as e:
log_error(f"Error reading ACF file {file_path}: {e}")
return None, None, None
def fetch_dlc_details(app_id):
base_url = f"https://store.steampowered.com/api/appdetails?appids={app_id}"
try:
response = requests.get(base_url)
data = response.json()
if app_id not in data or "data" not in data[app_id]:
return []
game_data = data[app_id]["data"]
dlcs = game_data.get("dlc", [])
dlc_details = []
with tqdm(total=len(dlcs), desc="Fetching DLC details") as pbar:
for dlc_id in dlcs:
try:
time.sleep(0.3)
dlc_url = f"https://store.steampowered.com/api/appdetails?appids={dlc_id}"
dlc_response = requests.get(dlc_url)
if dlc_response.status_code == 200:
dlc_data = dlc_response.json()
if str(dlc_id) in dlc_data and "data" in dlc_data[str(dlc_id)]:
dlc_name = dlc_data[str(dlc_id)]["data"].get("name", "Unknown DLC")
dlc_details.append({"appid": dlc_id, "name": dlc_name})
elif dlc_response.status_code == 429:
time.sleep(10)
pbar.update(1)
except Exception as e:
log_error(f"Exception for DLC {dlc_id}: {str(e)}")
return dlc_details
except requests.exceptions.RequestException as e:
log_error(f"Failed to fetch DLC details for {app_id}: {e}")
return []
def install_files(app_id, game_install_dir, dlcs, game_name):
GREEN = '\033[92m'
YELLOW = '\033[93m'
RESET = '\033[0m'
zip_url = "https://github.com/anticitizn/creamlinux/releases/latest/download/creamlinux.zip"
zip_path = os.path.join(game_install_dir, 'creamlinux.zip')
try:
response = requests.get(zip_url)
if response.status_code == 200:
with open(zip_path, 'wb') as f:
f.write(response.content)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(game_install_dir)
os.remove(zip_path)
cream_sh_path = os.path.join(game_install_dir, 'cream.sh')
os.chmod(cream_sh_path, os.stat(cream_sh_path).st_mode | stat.S_IEXEC)
dlc_list = "\n".join([f"{dlc['appid']} = {dlc['name']}" for dlc in dlcs])
cream_api_path = os.path.join(game_install_dir, 'cream_api.ini')
with open(cream_api_path, 'w') as f:
f.write(f"APPID = {app_id}\n[config]\nissubscribedapp_on_false_use_real = true\n[methods]\ndisable_steamapps_issubscribedapp = false\n[dlc]\n{dlc_list}")
print(f"Custom cream_api.ini has been written to {game_install_dir}.")
print(f"\n{GREEN}Installation complete!{RESET}")
print(f"\n{YELLOW}Set launch options in Steam:{RESET}")
print(f"{GREEN}'sh ./cream.sh %command%'{RESET} for {game_name}")
else:
log_error("Failed to download the files needed for installation.")
except Exception as e:
log_error(f"Failed to install files for {game_name}: {e}")
def uninstall_creamlinux(install_path, game_name):
YELLOW = '\033[93m'
GREEN = '\033[92m'
RESET = '\033[0m'
try:
files_to_remove = ['cream.sh', 'cream_api.ini', 'cream_api.so', 'lib32Creamlinux.so', 'lib64Creamlinux.so']
for file in files_to_remove:
file_path = os.path.join(install_path, file)
if os.path.exists(file_path):
os.remove(file_path)
print(f"\n{GREEN}Successfully uninstalled CreamLinux from {game_name}{RESET}")
print(f"\n{YELLOW}Make sure to remove{RESET} {GREEN}'sh ./cream.sh %command%'{RESET} {YELLOW}from launch options{RESET}")
except Exception as e:
log_error(f"Failed to uninstall CreamLinux: {e}")
def main():
parser = argparse.ArgumentParser(description="Steam DLC Fetcher")
parser.add_argument("--manual", metavar='steamapps_path', help="Sets the steamapps path for faster operation", required=False)
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
args = parser.parse_args()
setup_logging(args.debug)
show_header(app_version, args.debug)
try:
library_folders = find_steam_library_folders(args.manual)
if not library_folders:
print("Falling back to Manual Method since no library folder was found")
steamapps_path = input("Steamapps Path: ")
if len(steamapps_path) > 3:
library_folders = [steamapps_path]
else:
print("Invalid path! Closing the program...")
return
games = find_steam_apps(library_folders)
if games:
print("\nSelect the game for which you want to fetch DLCs:")
games_list = list(games.items())
GREEN = '\033[92m'
RESET = '\033[0m'
for idx, (app_id, (name, cream_status, _)) in enumerate(games_list, 1):
status = []
if cream_status:
status.append(f"{GREEN}Cream installed{RESET}")
status_str = f" ({', '.join(status)})" if status else ""
print(f"{idx}. {name} (App ID: {app_id}){status_str}")
choice = int(input("\nEnter the number of the game: ")) - 1
if 0 <= choice < len(games_list):
selected_app_id, (selected_game_name, cream_status, selected_install_dir) = games_list[choice]
if cream_status:
print("\nCreamLinux is installed. What would you like to do?")
print("1. Fetch DLC IDs")
print("2. Uninstall CreamLinux")
action = input("Enter your choice (1-2): ")
if action == "1":
print(f"\nSelected: {selected_game_name} (App ID: {selected_app_id})")
dlcs = fetch_dlc_details(selected_app_id)
if dlcs:
install_files(selected_app_id, selected_install_dir, dlcs, selected_game_name)
else:
print("No DLCs found for the selected game.")
elif action == "2":
uninstall_creamlinux(selected_install_dir, selected_game_name)
else:
print("Invalid choice.")
else:
print(f"\nSelected: {selected_game_name} (App ID: {selected_app_id})")
dlcs = fetch_dlc_details(selected_app_id)
if dlcs:
install_files(selected_app_id, selected_install_dir, dlcs, selected_game_name)
else:
print("No DLCs found for the selected game.")
else:
print("Invalid selection.")
else:
print("No Steam games found on this computer or connected drives.")
except Exception as e:
logging.exception("An error occurred:")
log_error(f"An error occurred: {e}")
if __name__ == "__main__":
main()

566
helper.py Normal file
View File

@@ -0,0 +1,566 @@
import os
import re
import requests
import zipfile
import time
import shutil
import stat
import subprocess
import logging
import json
from datetime import datetime
class SteamHelper:
def __init__(self, debug=False):
self.debug = debug
self.logger = None
self.config = None
# Only setup logging if debug is enabled - errors will setup logging on-demand
if debug:
self._setup_logging()
self.load_config()
def load_config(self):
"""Load configuration from config.json"""
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(script_dir, 'config.json')
# Create default config if it doesn't exist
if not os.path.exists(config_path):
default_config = {
"version": "v1.0.8",
"github_repo": "Novattz/creamlinux-installer",
"github_api": "https://api.github.com/repos/",
"creamlinux_release": "https://github.com/anticitizn/creamlinux/releases/latest/download/creamlinux.zip",
"smokeapi_release": "acidicoala/SmokeAPI",
}
with open(config_path, 'w') as f:
json.dump(default_config, f, indent=4)
self.config = default_config
if self.debug:
self._log_debug("Created default config.json")
return
try:
with open(config_path, 'r') as f:
self.config = json.load(f)
if self.debug:
self._log_debug(f"Loaded config: {self.config}")
except Exception as e:
self._log_error(f"Failed to load config: {str(e)}")
raise
def _cleanup_old_logs(self, log_dir, keep_logs=5):
"""Clean up old log files, keeping only the most recent ones"""
try:
log_files = [os.path.join(log_dir, f) for f in os.listdir(log_dir)
if f.endswith('.log')]
if len(log_files) > keep_logs:
log_files.sort(key=lambda x: os.path.getmtime(x))
for f in log_files[:-keep_logs]:
os.remove(f)
except Exception:
pass # Silently fail cleanup since this is not critical
def _setup_logging(self):
"""Setup logging to file with detailed formatting"""
script_dir = os.path.dirname(os.path.abspath(__file__))
log_dir = os.path.join(script_dir, 'logs')
os.makedirs(log_dir, exist_ok=True)
self._cleanup_old_logs(log_dir)
log_file = os.path.join(log_dir, f'cream_installer_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG if self.debug else logging.ERROR)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
logger = logging.getLogger('cream_installer')
logger.handlers = []
logger.propagate = False
logger.setLevel(logging.DEBUG if self.debug else logging.ERROR)
logger.addHandler(file_handler)
self.logger = logger
if self.debug:
self.logger.debug("=== Session Started ===")
self.logger.debug(f"Debug mode enabled - Log file: {log_file}")
self.logger.debug(f"System: {os.uname().sysname if hasattr(os, 'uname') else os.name}")
self.logger.debug(f"Python version: {subprocess.check_output(['python', '--version']).decode().strip()}")
self.logger.debug("Checking for Steam installation...")
def _log_debug(self, message):
"""Log debug message if debug mode is enabled"""
if self.debug and not self.logger:
self._setup_logging()
if self.logger:
self.logger.debug(message)
def _log_error(self, message):
"""Log error message, setting up logging if needed"""
if not self.logger:
self._setup_logging()
self.logger.error(message)
def check_requirements(self):
"""Check if all required commands and packages are available"""
missing_commands = []
missing_packages = []
# Check commands
required_commands = ['which', 'steam']
for cmd in required_commands:
if not subprocess.run(['which', cmd], capture_output=True).returncode == 0:
missing_commands.append(cmd)
self._log_error(f"Required command not found: {cmd}")
# Check packages
required_packages = ['requests', 'argparse', 'rich', 'json']
for package in required_packages:
try:
__import__(package)
except ImportError:
missing_packages.append(package)
self._log_error(f"Required Python package not found: {package}")
if missing_commands or missing_packages:
error_details = []
if missing_commands:
cmd_list = ', '.join(missing_commands)
error_details.append(f"Missing commands: {cmd_list}")
if missing_packages:
pkg_list = ', '.join(missing_packages)
error_details.append(f"Missing Python packages: {pkg_list}")
error_details.append("Install them using: pip install " + ' '.join(missing_packages))
raise RequirementsError("\n".join(error_details))
return True
def _is_excluded_app(self, app_id, name):
"""Check if the app should be excluded from the game list"""
excluded_ids = {
'228980', # Steamworks Common Redistributables
'1070560', # Steam Linux Runtime
'1391110', # Steam Linux Runtime - Soldier
'1628350', # Steam Linux Runtime - Sniper
'1493710', # Proton Experimental
'1826330' # Steam Linux Runtime - Scout
}
excluded_patterns = [
r'Proton \d+\.\d+',
r'Steam Linux Runtime',
r'Steamworks Common'
]
if app_id in excluded_ids:
return True
for pattern in excluded_patterns:
if re.match(pattern, name, re.IGNORECASE):
return True
return False
def _read_steam_registry(self):
"""Read Steam registry file"""
registry_path = os.path.expanduser('~/.steam/registry.vdf')
if os.path.exists(registry_path):
self._log_debug(f"Found Steam registry file: {registry_path}")
with open(registry_path, 'r') as f:
content = f.read()
install_path = re.search(r'"InstallPath"\s*"([^"]+)"', content)
if install_path:
return install_path.group(1)
return None
def _parse_vdf(self, file_path):
"""Parse Steam library folders VDF file"""
library_paths = []
try:
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
paths = re.findall(r'"path"\s*"(.*?)"', content, re.IGNORECASE)
library_paths.extend([os.path.normpath(path) for path in paths])
except Exception as e:
self._log_error(f"Failed to read {file_path}: {str(e)}")
return library_paths
def _find_steam_binary(self):
"""Find Steam binary location"""
try:
result = subprocess.run(['which', 'steam'], stdout=subprocess.PIPE)
steam_path = result.stdout.decode('utf-8').strip()
if steam_path:
return os.path.dirname(steam_path)
except Exception as e:
self._log_error(f"Failed to locate steam binary: {str(e)}")
return None
def find_steam_library_folders(self, manual_path=""):
"""Find all Steam library folders"""
self._log_debug("Starting Steam library folder search")
search_list = [
os.path.expanduser('~/.steam/steam'),
os.path.expanduser('~/.local/share/Steam'),
os.path.expanduser('/home/deck/.steam/steam'),
os.path.expanduser('/home/deck/.local/share/Steam'),
'/mnt/Jogos/Steam',
'/run/media/mmcblk0p1',
os.path.expanduser('~/.var/app/com.valvesoftware.Steam/.local/share/Steam'),
os.path.expanduser('~/.var/app/com.valvesoftware.Steam/data/Steam/steamapps/common')
]
library_folders = []
try:
if manual_path:
self._log_debug(f"Manual game path provided: {manual_path}")
if os.path.exists(manual_path):
self._log_debug("Manual path exists, adding to library folders")
library_folders.append(manual_path)
else:
self._log_debug(f"Manual path does not exist: {manual_path}")
return library_folders
steam_binary_path = self._find_steam_binary()
if steam_binary_path:
self._log_debug(f"Found Steam binary at: {steam_binary_path}")
if steam_binary_path not in search_list:
search_list.append(steam_binary_path)
steam_install_path = self._read_steam_registry()
if steam_install_path:
self._log_debug(f"Found Steam installation path in registry: {steam_install_path}")
if steam_install_path not in search_list:
search_list.append(steam_install_path)
self._log_debug("Searching for Steam library folders in all potential locations")
for search_path in search_list:
self._log_debug(f"Checking path: {search_path}")
if os.path.exists(search_path):
steamapps_path = str(os.path.normpath(f"{search_path}/steamapps"))
if os.path.exists(steamapps_path):
self._log_debug(f"Found valid steamapps folder: {steamapps_path}")
library_folders.append(steamapps_path)
vdf_path = os.path.join(steamapps_path, 'libraryfolders.vdf')
if os.path.exists(vdf_path):
self._log_debug(f"Found libraryfolders.vdf at: {vdf_path}")
additional_paths = self._parse_vdf(vdf_path)
for path in additional_paths:
new_steamapps_path = os.path.join(path, 'steamapps')
if os.path.exists(new_steamapps_path):
self._log_debug(f"Found additional library folder: {new_steamapps_path}")
library_folders.append(new_steamapps_path)
self._log_debug(f"Found {len(library_folders)} total library folders")
for folder in library_folders:
self._log_debug(f"Library folder: {folder}")
except Exception as e:
self._log_error(f"Error finding Steam library folders: {e}")
self._log_debug(f"Stack trace:", exc_info=True)
return library_folders
def _parse_acf(self, file_path):
"""Parse Steam ACF file"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
data = file.read()
app_id = re.search(r'"appid"\s+"(\d+)"', data)
name = re.search(r'"name"\s+"([^"]+)"', data)
install_dir = re.search(r'"installdir"\s+"([^"]+)"', data)
return app_id.group(1), name.group(1), install_dir.group(1)
except Exception as e:
self._log_error(f"Error reading ACF file {file_path}: {e}")
return None, None, None
def _check_proton_status(self, install_path):
"""
Check if a game requires Proton by looking for .exe files and Steam API DLLs
Returns: (needs_proton, steam_api_files)
"""
try:
has_exe = False
steam_api_files = []
steam_api_patterns = ['steam_api.dll', 'steam_api64.dll']
for root, _, files in os.walk(install_path):
# Check for .exe files
if not has_exe and any(file.lower().endswith('.exe') for file in files):
has_exe = True
# Check for Steam API files
for file in files:
if file.lower() in steam_api_patterns:
steam_api_files.append(os.path.relpath(os.path.join(root, file), install_path))
# If we found both, we can stop searching
if has_exe and steam_api_files:
break
return has_exe, steam_api_files
except Exception as e:
self._log_error(f"Error checking Proton status: {e}")
return False, []
def find_steam_apps(self, library_folders):
"""Find all Steam apps in library folders"""
self._log_debug("Starting Steam apps search")
acf_pattern = re.compile(r'^appmanifest_(\d+)\.acf$')
games = {}
for folder in library_folders:
self._log_debug(f"Searching for games in: {folder}")
if os.path.exists(folder):
for item in os.listdir(folder):
if acf_pattern.match(item):
app_id, game_name, install_dir = self._parse_acf(os.path.join(folder, item))
if app_id and game_name and not self._is_excluded_app(app_id, game_name):
install_path = os.path.join(folder, 'common', install_dir)
if os.path.exists(install_path):
cream_installed = os.path.exists(os.path.join(install_path, 'cream.sh'))
needs_proton, steam_api_files = self._check_proton_status(install_path)
smoke_installed = self.check_smokeapi_status(install_path, steam_api_files) if needs_proton else False
games[app_id] = (
game_name, # [0] Name
cream_installed, # [1] CreamLinux status
install_path, # [2] Install path
needs_proton, # [3] Proton status
steam_api_files, # [4] Steam API files
smoke_installed # [5] SmokeAPI status
)
self._log_debug(f"Found game: {game_name} (App ID: {app_id})")
self._log_debug(f" Path: {install_path}")
self._log_debug(f" Status: Cream={cream_installed}, Proton={needs_proton}, Smoke={smoke_installed}")
if steam_api_files:
self._log_debug(f" Steam API files: {', '.join(steam_api_files)}")
self._log_debug(f"Found {len(games)} total games")
return games
def fetch_dlc_details(self, app_id, progress_callback=None):
"""Fetch DLC details for a game"""
base_url = f"https://store.steampowered.com/api/appdetails?appids={app_id}"
try:
response = requests.get(base_url)
data = response.json()
if str(app_id) not in data:
return []
app_data = data[str(app_id)]
if not app_data.get('success') or 'data' not in app_data:
return []
game_data = app_data['data']
dlcs = game_data.get("dlc", [])
dlc_details = []
total_dlcs = len(dlcs)
for index, dlc_id in enumerate(dlcs):
try:
time.sleep(0.3)
dlc_url = f"https://store.steampowered.com/api/appdetails?appids={dlc_id}"
dlc_response = requests.get(dlc_url)
if dlc_response.status_code == 200:
dlc_data = dlc_response.json()
if str(dlc_id) in dlc_data and "data" in dlc_data[str(dlc_id)]:
dlc_name = dlc_data[str(dlc_id)]["data"].get("name", "Unknown DLC")
dlc_details.append({"appid": dlc_id, "name": dlc_name})
elif dlc_response.status_code == 429:
time.sleep(10)
if progress_callback:
progress_callback(index + 1, total_dlcs)
except Exception as e:
self._log_error(f"Error fetching DLC {dlc_id}: {str(e)}")
return dlc_details
except requests.exceptions.RequestException as e:
self._log_error(f"Failed to fetch DLC details: {str(e)}")
return []
def install_creamlinux(self, app_id, game_install_dir, dlcs):
"""Install CreamLinux for a game"""
try:
zip_url = self.config['creamlinux_release']
zip_path = os.path.join(game_install_dir, 'creamlinux.zip')
self._log_debug(f"Downloading CreamLinux from {zip_url}")
response = requests.get(zip_url)
if response.status_code != 200:
raise InstallationError(f"Failed to download CreamLinux (HTTP {response.status_code})")
self._log_debug(f"Writing zip file to {zip_path}")
with open(zip_path, 'wb') as f:
f.write(response.content)
self._log_debug("Extracting CreamLinux files")
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(game_install_dir)
except zipfile.BadZipFile:
raise InstallationError("Downloaded file is corrupted. Please try again.")
os.remove(zip_path)
cream_sh_path = os.path.join(game_install_dir, 'cream.sh')
self._log_debug(f"Setting permissions for {cream_sh_path}")
try:
os.chmod(cream_sh_path, os.stat(cream_sh_path).st_mode | stat.S_IEXEC)
except OSError as e:
raise InstallationError(f"Failed to set execute permissions: {str(e)}")
cream_api_path = os.path.join(game_install_dir, 'cream_api.ini')
self._log_debug(f"Creating config at {cream_api_path}")
try:
dlc_list = "\n".join([f"{dlc['appid']} = {dlc['name']}" for dlc in dlcs])
with open(cream_api_path, 'w') as f:
f.write(f"APPID = {app_id}\n[config]\nissubscribedapp_on_false_use_real = true\n[methods]\ndisable_steamapps_issubscribedapp = false\n[dlc]\n{dlc_list}")
except IOError as e:
raise InstallationError(f"Failed to create config file: {str(e)}")
return True
except Exception as e:
self._log_error(f"Installation failed: {str(e)}")
if isinstance(e, InstallationError):
raise
raise InstallationError(f"Installation failed: {str(e)}")
def uninstall_creamlinux(self, install_path):
"""Uninstall CreamLinux from a game"""
try:
files_to_remove = ['cream.sh', 'cream_api.ini', 'cream_api.so', 'lib32Creamlinux.so', 'lib64Creamlinux.so']
for file in files_to_remove:
file_path = os.path.join(install_path, file)
if os.path.exists(file_path):
os.remove(file_path)
return True
except Exception as e:
self._log_error(f"Uninstallation failed: {str(e)}")
return False
def install_smokeapi(self, install_path, steam_api_files):
"""Install SmokeAPI for a Proton game"""
try:
# Construct the correct URL using latest version
response = requests.get(
f"{self.config['github_api']}{self.config['smokeapi_release']}/releases/latest"
)
if response.status_code != 200:
raise InstallationError("Failed to fetch latest SmokeAPI version")
latest_release = response.json()
latest_version = latest_release['tag_name']
zip_url = (
f"https://github.com/{self.config['smokeapi_release']}/releases/download/"
f"{latest_version}/SmokeAPI-{latest_version}.zip"
)
zip_path = os.path.join(install_path, 'smokeapi.zip')
self._log_debug(f"Downloading SmokeAPI from {zip_url}")
response = requests.get(zip_url)
if response.status_code != 200:
raise InstallationError(f"Failed to download SmokeAPI (HTTP {response.status_code})")
self._log_debug(f"Writing zip file to {zip_path}")
with open(zip_path, 'wb') as f:
f.write(response.content)
self._log_debug("Extracting SmokeAPI files")
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for api_file in steam_api_files:
api_dir = os.path.dirname(os.path.join(install_path, api_file))
api_name = os.path.basename(api_file)
# Backup original file
original_path = os.path.join(api_dir, api_name)
backup_path = os.path.join(api_dir, api_name.replace('.dll', '_o.dll'))
self._log_debug(f"Processing {api_file}:")
self._log_debug(f" Original: {original_path}")
self._log_debug(f" Backup: {backup_path}")
# Only backup if not already backed up
if not os.path.exists(backup_path):
shutil.move(original_path, backup_path)
# Extract the appropriate DLL directly to the game directory
zip_ref.extract(api_name, api_dir)
self._log_debug(f" Installed SmokeAPI as: {original_path}")
except zipfile.BadZipFile:
raise InstallationError("Downloaded file is corrupted. Please try again.")
os.remove(zip_path)
return True
except Exception as e:
self._log_error(f"SmokeAPI installation failed: {str(e)}")
raise InstallationError(f"Failed to install SmokeAPI: {str(e)}")
def uninstall_smokeapi(self, install_path, steam_api_files):
"""Uninstall SmokeAPI and restore original files"""
try:
for api_file in steam_api_files:
api_dir = os.path.dirname(os.path.join(install_path, api_file))
api_name = os.path.basename(api_file)
original_path = os.path.join(api_dir, api_name)
backup_path = os.path.join(api_dir, api_name.replace('.dll', '_o.dll'))
if os.path.exists(backup_path):
if os.path.exists(original_path):
os.remove(original_path)
shutil.move(backup_path, original_path)
self._log_debug(f"Restored original file: {original_path}")
return True
except Exception as e:
self._log_error(f"SmokeAPI uninstallation failed: {str(e)}")
return False
def check_smokeapi_status(self, install_path, steam_api_files):
"""Check if SmokeAPI is installed"""
try:
for api_file in steam_api_files:
backup_path = os.path.join(
install_path,
os.path.dirname(api_file),
os.path.basename(api_file).replace('.dll', '_o.dll')
)
if os.path.exists(backup_path):
return True
return False
except Exception as e:
self._log_error(f"Error checking SmokeAPI status: {str(e)}")
return False
class RequirementsError(Exception):
"""Raised when system requirements are not met"""
pass
class NetworkError(Exception):
"""Raised when network-related operations fail"""
pass
class InstallationError(Exception):
"""Raised when installation operations fail"""
pass

201
main.py Executable file
View File

@@ -0,0 +1,201 @@
import argparse
import os
from helper import SteamHelper, RequirementsError, NetworkError, InstallationError
from ui_handler import UIHandler
from updater import check_for_updates, UpdateError
def handle_dlc_operation(ui, helper, app_id, game_name, install_dir):
"""Handle DLC fetching and installation"""
ui.show_info(f"\nSelected: {game_name} (App ID: {app_id})")
with ui.create_progress_context() as progress:
progress_task = progress.add_task("🔍 Fetching DLC details...", total=None)
def update_progress(current, total):
progress.update(progress_task, completed=current, total=total)
dlcs = helper.fetch_dlc_details(app_id, update_progress)
if dlcs:
ui.show_dlc_table(dlcs)
if ui.get_user_confirmation("\nProceed with installation?"):
with ui.create_status_context("Installing CreamLinux..."):
success = helper.install_creamlinux(app_id, install_dir, dlcs)
if success:
ui.show_success("Installation complete!")
ui.show_launch_options(game_name)
else:
ui.show_warning("No DLCs found for this game.")
def handle_smokeapi_operation(ui, helper, install_path, steam_api_files, game_name, is_install=True):
"""Handle SmokeAPI installation/uninstallation"""
operation = "installation" if is_install else "uninstallation"
ui.show_info(f"\nProceeding with SmokeAPI {operation} for {game_name}")
try:
with ui.create_status_context(f"{'Installing' if is_install else 'Uninstalling'} SmokeAPI..."):
if is_install:
success = helper.install_smokeapi(install_path, steam_api_files)
else:
success = helper.uninstall_smokeapi(install_path, steam_api_files)
if success:
ui.show_success(f"Successfully {'installed' if is_install else 'uninstalled'} SmokeAPI!")
else:
ui.show_error(f"Failed to {'install' if is_install else 'uninstall'} SmokeAPI")
except Exception as e:
ui.show_error(str(e))
def main():
parser = argparse.ArgumentParser(description="Steam DLC Fetcher")
parser.add_argument("--manual", metavar='steamapps_path', help="Sets the steamapps path for faster operation", required=False)
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument("--no-update", action="store_true", help="Skip update check")
args = parser.parse_args()
ui = UIHandler(debug=args.debug)
helper = SteamHelper(debug=args.debug)
try:
if not args.no_update:
try:
if check_for_updates(ui, helper):
return # Exit if update was performed
except UpdateError as e:
ui.show_error(f"Update failed: {str(e)}")
if not ui.get_user_confirmation("Would you like to continue anyway?"):
return
# Use version from config instead of fetching
app_version = helper.config['version']
ui.show_header(app_version, args.debug)
helper.check_requirements()
except RequirementsError as e:
ui.show_error("Missing dependencies:", show_details=str(e))
return
try:
with ui.create_status_context("Finding Steam library folders..."):
library_folders = helper.find_steam_library_folders(args.manual)
if not library_folders:
if args.manual:
ui.show_error(f"Could not find Steam library at specified path: {args.manual}")
else:
ui.show_warning("No Steam library folders found. Please enter the path manually.")
steamapps_path = ui.get_user_input("Enter Steamapps Path")
if len(steamapps_path) > 3 and os.path.exists(steamapps_path):
library_folders = [steamapps_path]
else:
ui.show_error("Invalid path or path does not exist!")
return
while True:
# Refresh games list at the start of each loop
with ui.create_status_context("Scanning for games..."):
games = helper.find_steam_apps(library_folders)
if not games:
ui.show_error("No Steam games found.")
return
games_list = list(games.items())
ui.show_games_table(games_list)
try:
ui.console.print("\n[dim]Enter game number or 'q' to quit[/dim]")
user_input = ui.get_user_input("Select game number")
if user_input.lower() == 'q':
return
choice = int(user_input) - 1
if not (0 <= choice < len(games_list)):
ui.show_error("Invalid selection.")
continue
# Show the selected game and options
ui.clear_screen()
ui.show_header(app_version, args.debug)
ui.show_games_table(games_list, choice)
# Get game's status
is_installed = games_list[choice][1][1] # cream_status
needs_proton = games_list[choice][1][3] # needs_proton
steam_api_files = games_list[choice][1][4] # steam_api_files
smoke_status = games_list[choice][1][5] # smoke_status
game_info = (games_list[choice][0], games_list[choice][1])
# Different choices based on installation status and game type
if needs_proton and steam_api_files:
if smoke_status:
max_options = 2 # Uninstall and Go Back
action = ui.get_user_input("\nChoose action", choices=["1", "2"])
if action == "2": # Go back
ui.clear_screen()
ui.show_header(app_version, args.debug)
continue
if action == "1": # Uninstall SmokeAPI
handle_smokeapi_operation(ui, helper, game_info[1][2], steam_api_files, game_info[1][0], False)
else:
max_options = 2 # Install and Go Back
action = ui.get_user_input("\nChoose action", choices=["1", "2"])
if action == "2": # Go back
ui.clear_screen()
ui.show_header(app_version, args.debug)
continue
if action == "1": # Install SmokeAPI
handle_smokeapi_operation(ui, helper, game_info[1][2], steam_api_files, game_info[1][0], True)
else:
# Handle non-Proton games (original logic)
if is_installed:
action = ui.get_user_input("\nChoose action", choices=["1", "2", "3"])
if action == "3": # Go back
ui.clear_screen()
ui.show_header(app_version, args.debug)
continue
if action == "1": # Fetch DLCs
handle_dlc_operation(ui, helper, game_info[0], game_info[1][0], game_info[1][2])
else: # Uninstall
if ui.get_user_confirmation("\nAre you sure you want to uninstall CreamLinux?"):
with ui.create_status_context("Uninstalling CreamLinux..."):
success = helper.uninstall_creamlinux(game_info[1][2])
if success:
ui.show_success(f"Successfully uninstalled CreamLinux from {game_info[1][0]}")
ui.show_uninstall_reminder()
else:
action = ui.get_user_input("\nChoose action", choices=["1", "2"])
if action == "2": # Go back
ui.clear_screen()
ui.show_header(app_version, args.debug)
continue
# Proceed with DLC operation
handle_dlc_operation(ui, helper, game_info[0], game_info[1][0], game_info[1][2])
# After any operation, ask if user wants to continue
if ui.get_user_confirmation("\nWould you like to perform another operation?"):
ui.clear_screen()
ui.show_header(app_version, args.debug)
continue
else:
break
except ValueError:
ui.show_error("Invalid input. Please enter a number.")
continue
except Exception as e:
if isinstance(e, (RequirementsError, NetworkError, InstallationError)):
ui.show_error(str(e))
else:
helper._log_error(f"Unexpected error: {str(e)}")
ui.show_error(f"An unexpected error occurred: {str(e)}", show_exception=args.debug)
if __name__ == "__main__":
main()

183
ui_handler.py Normal file
View File

@@ -0,0 +1,183 @@
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.table import Table
from rich.prompt import Prompt, Confirm
from rich.text import Text
from rich import box
class UIHandler:
def __init__(self, debug=False):
self.console = Console()
self.debug = debug
def clear_screen(self):
"""Clear the console screen"""
import os
os.system('cls' if os.name == 'nt' else 'clear')
def show_header(self, app_version, debug_mode):
"""Display the application header"""
self.clear_screen()
logo = r"""
_ _ _ _ _ ____ _____ _ ________ _____
| | | | \ | | | / __ \ / ____| |/ / ____| __ \
| | | | \| | | | | | | | | ' /| |__ | |__) |
| | | | . ` | | | | | | | | < | __| | _ /
| |__| | |\ | |___| |__| | |____| . \| |____| | \ \
\____/|_| \_|______\____/ \_____|_|\_\______|_| \_\\
"""
info_text = f"\n> Made by Tickbase\n> GitHub: https://github.com/Novattz/creamlinux-installer\n> Version: {app_version}"
if debug_mode:
info_text += "\n[red][Running in DEBUG mode][/red]"
self.console.print(
Panel.fit(
Text.from_markup(f"{logo}\n{info_text}"),
style="cyan",
border_style="cyan",
box=box.ROUNDED,
)
)
def show_games_table(self, games_list, selected_idx=None):
"""Display the table of available games"""
table = Table(show_header=True, header_style="cyan", box=box.ROUNDED)
table.add_column("#", style="dim")
table.add_column("Game Name")
table.add_column("Type", style="dim")
table.add_column("Status")
for idx, (app_id, (name, cream_status, _, needs_proton, _, smoke_status)) in enumerate(games_list, 1):
if needs_proton:
status = "[green]✓ Smoke installed[/green]" if smoke_status else "[yellow]Not Installed[/yellow]"
else:
status = "[green]✓ Cream installed[/green]" if cream_status else "[yellow]Not Installed[/yellow]"
game_type = "[blue]Proton[/blue]" if needs_proton else "[green]Native[/green]"
# Highlight only the game name if selected
if selected_idx is not None and idx == selected_idx + 1:
name = f"[bold white on blue]{name}[/bold white on blue]"
table.add_row(
f"[bold cyan]{idx}[/bold cyan]",
name,
game_type,
status
)
self.console.print("\n[cyan]Available Games:[/cyan]")
self.console.print(table)
if selected_idx is not None:
# Get selected game details
_, (game_name, cream_status, install_path, needs_proton, steam_api_files, smoke_status) = games_list[selected_idx]
if needs_proton:
status = "[green]✓ Smoke installed[/green]" if smoke_status else "[yellow]Not Installed[/yellow]"
else:
status = "[green]✓ Cream installed[/green]" if cream_status else "[yellow]Not Installed[/yellow]"
game_type = "[blue]Proton[/blue]" if needs_proton else "[green]Native[/green]"
app_id = games_list[selected_idx][0]
# Show footer with selected game info
self.console.print(f"[dim]Selected: {game_name} (Type: {game_type}) - Status: {status}[/dim]")
# Show Steam API files if present for Proton games
if needs_proton and steam_api_files:
self.console.print("\n[cyan]Steam API files found:[/cyan]")
for api_file in steam_api_files:
self.console.print(f"[dim]- {api_file}[/dim]")
# Show options based on game type and status
self.console.print("\n[cyan]Selected Game Options:[/cyan]")
options = []
if needs_proton and steam_api_files:
if smoke_status:
options.append("1. Uninstall SmokeAPI")
options.append("2. Go Back")
else:
options.append("1. Install SmokeAPI")
options.append("2. Go Back")
else:
if cream_status:
options.append("1. Fetch DLCs")
options.append("2. Uninstall CreamLinux")
options.append("3. Go Back")
else:
options.append("1. Install CreamLinux")
options.append("2. Go Back")
for option in options:
self.console.print(option)
def show_dlc_table(self, dlcs):
"""Display the table of available DLCs"""
table = Table(show_header=True, header_style="cyan", box=box.ROUNDED)
table.add_column("#", style="dim")
table.add_column("DLC Name")
table.add_column("DLC ID", style="dim")
for idx, dlc in enumerate(dlcs, 1):
table.add_row(
f"[bold cyan]{idx}[/bold cyan]",
dlc['name'],
str(dlc['appid'])
)
self.console.print(Panel.fit(table, title="[bold cyan]📦 Found DLCs[/bold cyan]", border_style="cyan"))
def create_progress_context(self):
"""Create and return a progress context"""
return Progress()
def show_error(self, message, show_exception=False):
"""Display an error message"""
self.console.print(f"[red]{message}[/red]")
if show_exception:
self.console.print_exception()
def show_warning(self, message):
"""Display a warning message"""
self.console.print(f"[yellow]{message}[/yellow]")
def show_success(self, message):
"""Display a success message"""
self.console.print(f"[green]✓ {message}[/green]")
def show_info(self, message):
"""Display an info message"""
self.console.print(f"[cyan]{message}[/cyan]")
def create_status_context(self, message):
"""Create and return a status context"""
return self.console.status(f"[cyan]{message}", spinner="dots")
def get_user_input(self, prompt_message, choices=None):
"""Get user input with optional choices"""
if choices:
return Prompt.ask(prompt_message, choices=choices)
return Prompt.ask(prompt_message)
def get_user_confirmation(self, message):
"""Get user confirmation"""
return Confirm.ask(message)
def show_cream_options(self, game_name):
"""Show CreamLinux options for installed games"""
self.show_info(f"\nCreamLinux is installed for {game_name}")
self.console.print("1. Fetch DLC IDs")
self.console.print("2. Uninstall CreamLinux")
def show_launch_options(self, game_name):
"""Show launch options after installation"""
self.console.print("\n[yellow]Steam Launch Options:[/yellow]")
self.console.print(f"[green]sh ./cream.sh %command%[/green] for {game_name}")
def show_uninstall_reminder(self):
"""Show reminder about launch options after uninstall"""
self.console.print("\n[yellow]Remember to remove[/yellow] [green]'sh ./cream.sh %command%'[/green] [yellow]from launch options[/yellow]")

136
updater.py Normal file
View File

@@ -0,0 +1,136 @@
import os
import sys
import shutil
import requests
import tempfile
import subprocess
from zipfile import ZipFile
from helper import SteamHelper
class UpdateError(Exception):
"""Raised when update operations fail"""
pass
def check_for_updates(ui_handler, helper):
"""
Check for updates and handle the update process if needed
Returns True if update was performed, False otherwise
"""
try:
helper._log_debug("Starting update check")
helper._log_debug(f"Current version: {helper.config['version']}")
helper._log_debug(f"Checking for updates from: {helper.config['github_repo']}")
# Get latest release info from GitHub
api_url = f"{helper.config['github_api']}{helper.config['github_repo']}/releases/latest"
helper._log_debug(f"Fetching from: {api_url}")
response = requests.get(api_url)
latest_release = response.json()
latest_version = latest_release['tag_name']
helper._log_debug(f"Latest version found: {latest_version}")
if latest_version != helper.config['version']:
ui_handler.show_info(f"\nNew version available: {latest_version}")
ui_handler.show_info(f"Current version: {helper.config['version']}")
if ui_handler.get_user_confirmation("Would you like to update?"):
perform_update(latest_release, ui_handler, helper)
# Update config with new version
helper.config['version'] = latest_version
helper.save_config()
helper._log_debug("Updated config.json with new version")
return True
return False
except requests.exceptions.RequestException as e:
helper._log_error(f"Failed to check for updates: {str(e)}")
ui_handler.show_warning(f"Failed to check for updates: {str(e)}")
return False
def perform_update(release_info, ui_handler, helper):
"""Download and install the update"""
script_path = os.path.abspath(sys.argv[0])
script_dir = os.path.dirname(script_path)
helper._log_debug(f"Script directory: {script_dir}")
try:
# Find the asset URL
zip_asset = next((asset for asset in release_info['assets']
if asset['name'].endswith('.zip')), None)
if not zip_asset:
helper._log_debug("No zip asset found in release")
raise UpdateError("No valid update package found")
helper._log_debug(f"Found update package: {zip_asset['name']}")
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
helper._log_debug(f"Created temp directory: {temp_dir}")
# Download the update
with ui_handler.create_status_context("Downloading update..."):
response = requests.get(zip_asset['browser_download_url'], stream=True)
zip_path = os.path.join(temp_dir, 'update.zip')
helper._log_debug(f"Downloading to: {zip_path}")
with open(zip_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Extract update
with ui_handler.create_status_context("Installing update..."):
helper._log_debug("Extracting update files")
with ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Create backup of current files
backup_dir = os.path.join(script_dir, 'backup')
os.makedirs(backup_dir, exist_ok=True)
helper._log_debug(f"Created backup directory: {backup_dir}")
# Copy current files to backup
for file in os.listdir(script_dir):
if file.endswith('.py') or file == 'config.json':
helper._log_debug(f"Backing up: {file}")
shutil.copy2(
os.path.join(script_dir, file),
os.path.join(backup_dir, file)
)
# Copy new files
for file in os.listdir(temp_dir):
if file.endswith('.py'):
helper._log_debug(f"Installing new file: {file}")
shutil.copy2(
os.path.join(temp_dir, file),
os.path.join(script_dir, file)
)
ui_handler.show_success("Update completed successfully!")
ui_handler.show_info("Restarting application...")
helper._log_debug("Preparing to restart application")
# Restart the application
python = sys.executable
os.execl(python, python, *sys.argv)
except Exception as e:
helper._log_error(f"Update failed: {str(e)}")
# Attempt to restore from backup if available
backup_dir = os.path.join(script_dir, 'backup')
if os.path.exists(backup_dir):
helper._log_debug("Attempting to restore from backup")
for file in os.listdir(backup_dir):
helper._log_debug(f"Restoring: {file}")
shutil.copy2(
os.path.join(backup_dir, file),
os.path.join(script_dir, file)
)
shutil.rmtree(backup_dir)
raise UpdateError(f"Update failed: {str(e)}")