From 704b744b9626a4db634f8c84f48f50f78f5b811d Mon Sep 17 00:00:00 2001 From: timoxa0 Date: Mon, 6 May 2024 21:14:10 +0500 Subject: [PATCH] Initial commit --- .python-version | 1 + LICENSE | 674 +++++++++++++++++++++++++++++++++++ | 2 + lon_deployer/ | 85 +++++ lon_deployer/ | 1 + lon_deployer/ | 343 ++++++++++++++++++ lon_deployer/ | 17 + lon_deployer/ | 175 +++++++++ poetry.lock | 692 ++++++++++++++++++++++++++++++++++++ pyproject.toml | 29 ++ | 2 + tests/ | 0 tests/ | 8 + 13 files changed, 2029 insertions(+) create mode 100644 .python-version create mode 100644 LICENSE create mode 100644 create mode 100644 lon_deployer/ create mode 100644 lon_deployer/ create mode 100644 lon_deployer/ create mode 100644 lon_deployer/ create mode 100644 lon_deployer/ create mode 100644 poetry.lock create mode 100644 pyproject.toml create mode 100644 create mode 100644 tests/ create mode 100644 tests/ diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..2c07333 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.11 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f288702 --- /dev/null +++ b/LICENSE @@ -0,0 +1,674 @@ + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 But first, please read +. diff --git a/ b/ new file mode 100644 index 0000000..4462301 --- /dev/null +++ b/ @@ -0,0 +1,2 @@ +# Linux on Nabu Deployer +#### Python tool for installing linux on xiaomi pad 5 \ No newline at end of file diff --git a/lon_deployer/ b/lon_deployer/ new file mode 100644 index 0000000..19d6dcf --- /dev/null +++ b/lon_deployer/ @@ -0,0 +1,85 @@ +import os +import json +import urllib.parse +import requests +import hashlib +from os import path as op +from os import getcwd as pwd +from sys import exit +from rich.console import Console + +from .utils import get_progress + +console = Console(log_path=False) + + +class File: + def __init__(self, url: str): + = url.split("/")[-1] + self.url = url + self.filepath = op.join(pwd(), "files/", + if not op.exists(op.join(pwd(), "files/")): + os.mkdir(op.join(pwd(), "files/")) + elif not op.isdir(op.join(pwd(), "files/")): + os.remove(op.join(pwd(), "files/")) + os.mkdir(op.join(pwd(), "files/")) + + def md5sum(self) -> str | None: + try: + print(urllib.parse.urlparse(self.url).path) + return json.loads(requests.get(f"{urllib.parse.urlparse(self.url).path}") + .content.decode())["hashes"]["md5"] + except requests.ConnectionError: + pass + + def get(self): + while True: + md5sum = self.md5sum() + if not md5sum: + console.log(f"Unable to verify {} checksum") + + if op.exists(self.filepath): + with open(self.filepath, "rb") as file: + data = + if hashlib.md5(data).hexdigest() == md5sum or not md5sum: + return data + + download_image = b"" + r = requests.get(self.url, stream=True) + if r.status_code != 200: + console.log(f"{} not found on server. Please contact developer") + total_size = int(r.headers.get("content-length", 0)) + block_size = 204800 + with get_progress() as pbar: + task = pbar.add_task(f"[green]Downloading {}", total=total_size) + for data in r.iter_content(block_size): + download_image += data + pbar.update(task, advance=len(data)) + with open(self.filepath, "wb") as file: + file.write(download_image) + + if hashlib.md5(download_image).hexdigest() == md5sum or not md5sum: + return download_image + else: + console.log("Downloaded file corrupted!") + + +OrangeFox = File( + url="" +) + +UEFI_Payload = File( + url="" +) + +BootShim = File( + url="" +) + +GPT_Both0 = File( + url="" +) + +UserData_Empty = File( + url="" +) diff --git a/lon_deployer/ b/lon_deployer/ new file mode 100644 index 0000000..b6e690f --- /dev/null +++ b/lon_deployer/ @@ -0,0 +1 @@ +from . import * diff --git a/lon_deployer/ b/lon_deployer/ new file mode 100644 index 0000000..7ec0f93 --- /dev/null +++ b/lon_deployer/ @@ -0,0 +1,343 @@ +import argparse +import re +import signal +import subprocess +import threading +import magic +from os import getcwd as pwd, remove +from os import path as op +from sys import exit +from time import sleep + +import adbutils +import +from rich.console import Console +from rich.prompt import Prompt + +from . import Files +from .utils import check_device, get_port, flash_boot, boot_ofox, clean_device, wait_for_bootloader, check_parts, \ + restore_parts, repartition, get_progress, list_fb_devices, reboot_fb_device + +console = Console(log_path=False) + +exit_counter = 0 + +adb: adbutils.AdbClient | None = None + + +def handle_sigint(*_) -> None: + global exit_counter + if exit_counter == 2: + console.log("CTRL+C pressed 3 times. Exiting") + exit(1) + else: + console.log(f"Press CTRL+C {2 - exit_counter} more {'time' if exit_counter == 1 else 'times'} to exit") + exit_counter += 1 + + +def main() -> int: + global adb + + signal.signal(signal.SIGINT, handle_sigint) + + parser = argparse.ArgumentParser( + description="Linux on Nabu deployer" + ) + + parser.add_argument( + "-d", "--device-serial", + help="Device serial" + ) + + parser.add_argument( + "-u", "--username", + help="User name" + ) + + parser.add_argument( + "-p", "--password", + help="User password" + ) + + parser.add_argument( + "RootFS", + help="RootFS image" + ) + + parser.add_argument( + "-S", "--part-size", + help="linux partition size in percents" + ) + + args = parser.parse_args() + + rootfs = op.abspath(args.RootFS) + try: + if magic.Magic(mime=True).from_file(rootfs) != "application/octet-stream": + console.log("Invalid RootFS image") + return 1 + except FileNotFoundError: + console.log("RootFS image not found!") + return 1 + + while True: + try: + adb = adbutils.AdbClient(host="", port=5037) + adb.make_connection() + except adbutils.errors.AdbTimeout: + with console.status("[cyan]Starting adb server", spinner="line", spinner_style="white"): + try: + proc = subprocess.Popen("adb start-server", + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True + ) + stdout, stderr = proc.communicate() + except FileNotFoundError: + console.log("Failed to start adb server") + console.log("Adb binary not found in path") + adb = None + return 1 + else: + if proc.wait() != 0: + console.log("Failed to start adb server") + console.log(stdout) + adb = None + return 1 + else: + break + + username = args.username + while username is None: + username_pattern = r"^[a-z0-9](?!.*[-._?])[a-z0-9]{1,18}[a-z0-9]$" + username = Prompt.ask("Username for linux") + if not re.match(username_pattern, username): + console.log("Incorrect username specified. Please set correct one") + username = None + + password = args.password + while password is None: + password_pattern = r"^[a-z0-9?._-]{1,20}$" + password = Prompt.ask(f"Password for {username}", password=True) + if not re.match(password_pattern, password): + console.log("Incorrect password specified. Please set correct one") + password = None + + linux_part_size = args.part_size + while linux_part_size is None: + linux_part_size = Prompt.ask( + "Size of linux partition (leave empty to skip if possible)", + default="", show_default=False + ) + if linux_part_size == "": + linux_part_size = None + break + elif re.match(r"^\d+%$", linux_part_size) and 20 <= int(linux_part_size[:-1]) <= 90: + break + else: + console.log("Incorrect linux partition size. It can be [20; 90]%") + linux_part_size = None + + fb_list = list_fb_devices() + adb_list = list(map(lambda x: x.serial, adb.list())) + if args.device_serial: + if args.device_serial in fb_list or args.device_serial in adb_list: + serial = args.device_serial + else: + console.log(f"Device with serial {args.device_serial} not found") + return 1 + elif len(fb_list) == 1 and len(adb_list) == 0: + serial = fb_list[0] + elif len(adb_list) == 1 and len(fb_list) == 0: + serial = adb_list[0] + elif len(adb_list + fb_list) == 0: + console.log("No devices available. Please check your device connection") + return 1 + else: + console.log("More then one device detected. Use -s flag to set device") + return 1 + + for msg in [ + f"Username: {username}", + f"Password: {password}", + f"Partition size: {linux_part_size if linux_part_size else 'Not changed'}", + f"Device: {serial}" + ]: + console.log(msg) + + if Prompt.ask("Is it ok?", default="n", choices=["y", "n"]) == "n": + return 1 + + if serial not in fb_list: + console.log("ADB Device detected. Rebooting it to bootloader") + adb.device(serial).shell("reboot bootloader") + with console.status("[cyan]Waiting for fastboot device", spinner="line", spinner_style="white"): + wait_for_bootloader(serial) + console.log("Device connected") + else: + console.log("Device connected") + + if not check_device(serial): + console.log("Is it nabu?") + reboot_fb_device(serial) + return 2 + + parts_status = check_parts(serial) + if linux_part_size: + if Prompt.ask( + f"Repartition {'requested' if parts_status else 'needed'}. All data will be ERASED", + default="n", choices=["y", "n"]) == "y": + console.log("Restoring stock partition table") + restore_parts(serial) + console.log("Booting OrangeFox recovery") + boot_ofox(serial) + with console.status("[cyan]Waiting for device", spinner="line", spinner_style="white"): + try: + adb.wait_for(serial, state="recovery") + except adbutils.errors.AdbTimeout(): + console.log("Could not detect recovery device") + return 1 + repartition(serial, int(linux_part_size.replace("%", "")), percents=True) + console.log("Repartition complete") + adbutils.device(serial).shell("reboot bootloader") + console.log("Rebooting into bootloader") + wait_for_bootloader(serial) + console.log("Booting OrangeFox recovery") + boot_ofox(serial) + with console.status("[cyan]Waiting for device", spinner="line", spinner_style="white"): + try: + adb.wait_for(serial, state="recovery") + except adbutils.errors.AdbTimeout(): + console.log("Could not detect recovery device") + return 1 + with console.status("[cyan]Formating userdata partition", spinner="line", spinner_style="white"): + adbutils.device(serial).shell("twrp format data") + console.log("Userdata partition formated") + adbutils.device(serial).shell("reboot bootloader") + console.log("Rebooting into bootloader") + with console.status("[cyan]Waiting for device", spinner="line", spinner_style="white"): + wait_for_bootloader(serial) + else: + console.log("Repartition canceled. Exiting") + return 1 + + if not parts_status and not linux_part_size: + console.log("Incompatible partition table detected. Repartition needed. Exiting") + return 1 + + console.log("Cleaning linux and esp") + clean_device(serial) + + console.log("Booting OrangeFox recovery") + + boot_ofox(serial) + + with console.status("[cyan]Waiting for device", spinner="line", spinner_style="white"): + try: + adb.wait_for(serial, state="recovery") + except adbutils.errors.AdbTimeout(): + console.log("Could not detect recovery device") + return 1 + + adbd = adb.device(serial) + + with console.status("[cyan]Formating EFI partition", spinner="line", spinner_style="white"): +"mkfs.fat -F32 -s1 /dev/block/platform/soc/1d84000.ufshc/by-name/esp -n ESPNABU") + console.log("EFI partition formated") + server_port = get_port() + nc_thread = threading.Thread( +, + args=(f"busybox nc -l{server_port} > /dev/block/platform/soc/1d84000.ufshc/by-name/linux",), + daemon=True + ) + nc_thread.start() + sleep(3) + console.log("Flashing RootFS") + with adbd.create_connection("tcp", server_port) as conn: + with get_progress() as pbar: + task = pbar.add_task("[cyan]Uploading RootFS", total=op.getsize(rootfs)) + with open(rootfs, "rb") as rootfs: + while True: + data = + conn.send(data) + pbar.update(task, advance=len(data)) + if len(data) < 10240: + break + conn.close() + + nc_thread.join() + + with console.status("[cyan]Setting up user and creating boot files", spinner="line", spinner_style="white"): + if adbd.shell2(f"postinstall {username} {password}").returncode == 0: + console.log("User created") + console.log("Boot files created") + else: + console.log("Postinstall failed. Rebooting to system") + adbd.reboot() + return 4 + + console.log("Installing UEFI") + bootshim = Files.BootShim.get() + payload = Files.UEFI_Payload.get() +"mkdir /tmp/uefi-install") + with get_progress() as pbar: + task = pbar.add_task("[cyan]Pushing uefi files", total=2) + adbd.sync.push(bootshim, f"/tmp/uefi-install/{}") + pbar.update(task, advance=1) + adbd.sync.push(payload, f"/tmp/uefi-install/{}") + pbar.update(task, advance=1) + console.log("Patching boot image") + match adbd.shell2("uefi-patch").returncode: + case 1: + console.log("Failed to patch boot") + return 1 + case 2: + console.log("Boot image already patched. Skipping") + adbd.reboot() + case 0: + patched_size = int("stat -c%s /tmp/uefi-install/new-boot.img")) + with get_progress() as pbar: + task = pbar.add_task("[cyan]Saving patched boot to disk", total=patched_size) + boot_uefi_path = op.join(pwd(), "new_boot.img") + if op.exists(boot_uefi_path): + remove(boot_uefi_path) + with open(boot_uefi_path, "ab") as file: + for chunk in adbd.sync.iter_content("/tmp/uefi-install/new-boot.img"): + file.write(chunk) + pbar.update(task, advance=len(chunk)) + console.log(f"Pathed boot loaded to {boot_uefi_path}") + + backup_size = int("stat -c%s /tmp/uefi-install/boot.img")) + with get_progress() as pbar: + task = pbar.add_task("[cyan]Saving boot backup to disk", total=backup_size) + boot_backup_path = op.join(pwd(), "boot_backup.img") + if op.exists(boot_backup_path): + remove(boot_backup_path) + with open(boot_backup_path, "ab") as file: + for chunk in adbd.sync.iter_content("/tmp/uefi-install/new-boot.img"): + file.write(chunk) + pbar.update(task, advance=len(chunk)) + console.log(f"Boot backup saved to {boot_backup_path}") + + console.log("Rebooting to bootloader") +"reboot bootloader") + wait_for_bootloader(serial) + console.log("Flashing patched boot") + with open(boot_uefi_path, "rb") as file: + flash_boot(serial, + reboot_fb_device(serial) + + console.log("Done!") + return 0 + + +def run() -> None: + global adb + status = main() + with console.status("[cyan]Stopping adb server", spinner="line", spinner_style="white"): + sleep(1) + if adb is not None: + adb.server_kill() + exit(status) + + +if "__main__" == __name__: + run() diff --git a/lon_deployer/ b/lon_deployer/ new file mode 100644 index 0000000..f882cd1 --- /dev/null +++ b/lon_deployer/ @@ -0,0 +1,17 @@ +import PyInstaller.__main__ +from pathlib import Path + +HERE = Path(__file__).parent.parent.absolute() +path_to_main = str(HERE / "") + + +def install(): +[ + path_to_main, + '--onefile', + '--collect-submodules', + '--name LoN Deployer', + "-i", + "NONE", + # other pyinstaller options... + ]) diff --git a/lon_deployer/ b/lon_deployer/ new file mode 100644 index 0000000..a7c6d2a --- /dev/null +++ b/lon_deployer/ @@ -0,0 +1,175 @@ +import logging +import os +import re +import socket +import subprocess +from random import randint +from time import sleep + +import adbutils +from rich.logging import RichHandler +from rich.console import Console +from rich.progress import ( + BarColumn, + DownloadColumn, + Progress, + TextColumn, + TimeRemainingColumn, + TransferSpeedColumn, +) + +from . import Files + + +console = Console(log_path=False) + + +class TimeRemainingColumnCustom(TimeRemainingColumn): + max_refresh = 1 + + +FORMAT = "%(message)s" +logging.basicConfig( + level="INFO", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()] +) + +logger = logging.getLogger("Deployer") + + +class DeviceNotFound(Exception): + pass + + +def get_progress() -> Progress: + return Progress( + TextColumn("[bold blue]{task.description}", justify="right"), + BarColumn(bar_width=None), + "[progress.percentage]{task.percentage:>3.1f}%", + "•", + DownloadColumn(), + "•", + TransferSpeedColumn(), + "•", + TimeRemainingColumnCustom(), + ) + + +def fastboot_run(command: [str], serial: str = None) -> str: + + try: + if not serial: + cmd = f"fastboot {' '.join(command)}" + else: + cmd = f"fastboot -s {serial} {' '.join(command)}" + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + except FileNotFoundError: + console.log("Fastboot binary not found") + console.log("Exiting") + exit(1) + else: + stdout, stderr = proc.communicate() + proc.wait() + return (stdout if stdout else stderr).decode() + + +def list_fb_devices() -> [str]: + return [x.split("\t ")[0] for x in fastboot_run(["devices"]).split("\n")[:-1]] + + +def check_device(serial: str) -> bool: + return "nabu" in fastboot_run(["getvar", "product"], serial=serial) + + +def check_parts(serial: str) -> bool: + linux_response = fastboot_run(["getvar", "partition-type:linux"], serial=serial) + esp_response = fastboot_run(["getvar", "partition-type:esp"], serial=serial) + return linux_response != "FAILED" and esp_response != "FAILED" + + +def reboot_fb_device(serial: str) -> None: + fastboot_run(["reboot"], serial=serial) + + +def check_port(tcp_port: int) -> bool: + s = socket.socket() + try: + s.bind(("", tcp_port)) + except OSError as e: + if e.errno == 98: + return False + return True + + +def get_port() -> int: + while True: + tcp_port = randint(10000, 60000) + if check_port(tcp_port): + return tcp_port + + +def boot_ofox(serial: str) -> None: + Files.OrangeFox.get() + ofox = Files.OrangeFox.filepath + with console.status("[cyan]Booting", spinner="line", spinner_style="white"): + fastboot_run(["boot", ofox]) + + +def flash_boot(serial: str, boot_data: bytes) -> None: + proper_flash(serial, "boot", boot_data) + + +def proper_flash(serial: str, part: str, data: bytes) -> None: + with open("image.img", "wb") as file: + file.write(data) + + with console.status(f"[cyan]Flashing {part}", spinner="line", spinner_style="white"): + fastboot_run(["flash", part, "image.img"], serial=serial) + + os.remove("image.img") + + +def restore_parts(serial: str) -> None: + gpt_both = Files.GPT_Both0.get() + userdata = Files.UserData_Empty.get() + proper_flash(serial, "partition:0", gpt_both) + proper_flash(serial, "userdata", userdata) + + +def clean_device(serial: str) -> None: + fastboot_run(["erase", "linux"], serial=serial) + fastboot_run(["erase", "esp"], serial=serial) + + +def repartition(serial: str, size: int, percents=False) -> None: + device = adbutils.adb.device(serial) + block_size ="blockdev --getsize64 /dev/block/sda") + if re.match(r"^125[0-9]{9}$", block_size): + maxsize = 126 + elif re.match(r"^253[0-9]{9}$", block_size): + maxsize = 254 + else: + logger.error("Weird block size. "poetry.core.masonry.api" diff --git a/ b/ new file mode 100644 index 0000000..59a6cba --- /dev/null +++ b/ @@ -0,0 +1,2 @@ +from lon_deployer import main +main.main() diff --git a/tests/ b/tests/ new file mode 100644 index 0000000..e69de29 diff --git a/tests/ b/tests/ new file mode 100644 index 0000000..e1a8b5e --- /dev/null +++ b/tests/ @@ -0,0 +1,8 @@ +from lon_deployer import Files + + +def test_ofox() -> None: + file = Files.OrangeFox + assert == "orangefox.img" + assert file.md5sum() == "3edc8c32db0384006caf8cf066257811" +