Move fastboot funcs to fastboot.py

This commit is contained in:
timoxa0 2024-05-13 19:30:34 +05:00
parent 105aacf527
commit 6524b921eb
2 changed files with 90 additions and 91 deletions

86
lon_deployer/fastboot.py Normal file
View file

@ -0,0 +1,86 @@
from . import files, exceptions
from .utils import logger, console
import os
import subprocess
def _fastboot_run(command: [str], serial: str = None) -> str:
try:
if not serial:
cmd = f"fastboot {' '.join(command)}"
else:
cmd = f"fastboot -s {serial} {' '.join(command)}"
fb_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True, timeout=60)
except FileNotFoundError:
console.log("Fastboot binary not found")
console.log("Exiting")
exit(1)
except subprocess.CalledProcessError:
raise exceptions.DeviceNotFound("Timed out")
else:
logger.debug(f"fb-cmd: {cmd}")
logger.debug(f"fb-out: {fb_out}")
return fb_out.decode()
def list_devices() -> [str]:
return list(
filter(
lambda x: x != "",
[x.split("\t ")[0] for x in _fastboot_run(["devices"]).split("\n")]
)
)
def check_device(serial: str) -> bool:
return "nabu" in _fastboot_run(["getvar", "product"], serial=serial)
def check_parts(serial: str) -> bool:
linux_response = _fastboot_run(["getvar", "partition-type:linux"], serial=serial)
esp_response = _fastboot_run(["getvar", "partition-type:esp"], serial=serial)
logger.debug({
"esp": 'FAILED' not in esp_response,
"linux": 'FAILED' not in linux_response
})
return not ("FAILED" in linux_response or "FAILED" in esp_response)
def reboot(serial: str) -> None:
_fastboot_run(["reboot"], serial=serial)
def boot_ofox(serial: str) -> None:
files.OrangeFox.get()
ofox = files.OrangeFox.filepath
with console.status("[cyan]Booting", spinner="line", spinner_style="white"):
out = _fastboot_run(["boot", ofox], serial)
if "Failed to load/authenticate boot image: Device Error" in out:
raise exceptions.FastbootException("Failed to load/authenticate boot image: Device Error")
def flash(serial: str, part: str, data: bytes) -> None:
with open("image.img", "wb") as file:
file.write(data)
with console.status(f"[cyan]Flashing {part}", spinner="line", spinner_style="white"):
_fastboot_run(["flash", part, "image.img"], serial=serial)
os.remove("image.img")
def restore_parts(serial: str) -> None:
gpt_both = files.GPT_Both0.get()
userdata = files.UserData_Empty.get()
flash(serial, "partition:0", gpt_both)
flash(serial, "userdata", userdata)
def clean_device(serial: str) -> None:
_fastboot_run(["erase", "linux"], serial=serial)
_fastboot_run(["erase", "esp"], serial=serial)
def wait_for_bootloader(serial: str) -> None:
_fastboot_run(["getvar", "product"], serial=serial)

View file

@ -1,11 +1,11 @@
import logging import logging
import os
import re import re
import socket import socket
import subprocess
from random import randint from random import randint
from time import sleep from time import sleep
from . import exceptions
import adbutils import adbutils
from rich.logging import RichHandler from rich.logging import RichHandler
from rich.console import Console from rich.console import Console
@ -18,9 +18,6 @@ from rich.progress import (
TransferSpeedColumn, TransferSpeedColumn,
) )
from . import Files
console = Console(log_path=False) console = Console(log_path=False)
@ -36,10 +33,6 @@ logging.basicConfig(
logger = logging.getLogger("Deployer") logger = logging.getLogger("Deployer")
class DeviceNotFound(Exception):
pass
def get_progress() -> Progress: def get_progress() -> Progress:
return Progress( return Progress(
TextColumn("[bold blue]{task.description}", justify="right"), TextColumn("[bold blue]{task.description}", justify="right"),
@ -54,49 +47,6 @@ def get_progress() -> Progress:
) )
def fastboot_run(command: [str], serial: str = None) -> str:
try:
if not serial:
cmd = f"fastboot {' '.join(command)}"
else:
cmd = f"fastboot -s {serial} {' '.join(command)}"
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
except FileNotFoundError:
console.log("Fastboot binary not found")
console.log("Exiting")
exit(1)
else:
stdout, stderr = proc.communicate()
proc.wait()
logger.debug(f"fb-cmd: {cmd}")
logger.debug(f"fb-out: {stdout}")
logger.debug(f"fb-err: {stderr}")
return (stdout if stdout else stderr).decode()
def list_fb_devices() -> [str]:
return [x.split("\t ")[0] for x in fastboot_run(["devices"]).split("\n")[:-1]]
def check_device(serial: str) -> bool:
return "nabu" in fastboot_run(["getvar", "product"], serial=serial)
def check_parts(serial: str) -> bool:
linux_response = fastboot_run(["getvar", "partition-type:linux"], serial=serial)
esp_response = fastboot_run(["getvar", "partition-type:esp"], serial=serial)
logger.debug({
"esp": 'FAILED' not in esp_response,
"linux": 'FAILED' not in linux_response
})
return not ("FAILED" in linux_response or "FAILED" in esp_response)
def reboot_fb_device(serial: str) -> None:
fastboot_run(["reboot"], serial=serial)
def check_port(tcp_port: int) -> bool: def check_port(tcp_port: int) -> bool:
s = socket.socket() s = socket.socket()
try: try:
@ -114,39 +64,6 @@ def get_port() -> int:
return tcp_port return tcp_port
def boot_ofox(serial: str) -> None:
Files.OrangeFox.get()
ofox = Files.OrangeFox.filepath
with console.status("[cyan]Booting", spinner="line", spinner_style="white"):
fastboot_run(["boot", ofox])
def flash_boot(serial: str, boot_data: bytes) -> None:
proper_flash(serial, "boot", boot_data)
def proper_flash(serial: str, part: str, data: bytes) -> None:
with open("image.img", "wb") as file:
file.write(data)
with console.status(f"[cyan]Flashing {part}", spinner="line", spinner_style="white"):
fastboot_run(["flash", part, "image.img"], serial=serial)
os.remove("image.img")
def restore_parts(serial: str) -> None:
gpt_both = Files.GPT_Both0.get()
userdata = Files.UserData_Empty.get()
proper_flash(serial, "partition:0", gpt_both)
proper_flash(serial, "userdata", userdata)
def clean_device(serial: str) -> None:
fastboot_run(["erase", "linux"], serial=serial)
fastboot_run(["erase", "esp"], serial=serial)
def repartition(serial: str, size: int, percents=False) -> None: def repartition(serial: str, size: int, percents=False) -> None:
device = adbutils.adb.device(serial) device = adbutils.adb.device(serial)
block_size = device.shell("blockdev --getsize64 /dev/block/sda") block_size = device.shell("blockdev --getsize64 /dev/block/sda")
@ -156,7 +73,7 @@ def repartition(serial: str, size: int, percents=False) -> None:
maxsize = 254 maxsize = 254
else: else:
logger.error("Weird block size. Is it nabu?") logger.error("Weird block size. Is it nabu?")
exit(4) raise exceptions.RepartitonError()
linux_max = maxsize - 12 linux_max = maxsize - 12
if percents: if percents:
size = round(linux_max / 100 * size, 2) size = round(linux_max / 100 * size, 2)
@ -175,8 +92,4 @@ def repartition(serial: str, size: int, percents=False) -> None:
] ]
for cmd in cmds: for cmd in cmds:
device.shell(cmd) device.shell(cmd)
sleep(3) sleep(1)
def wait_for_bootloader(serial: str) -> None:
fastboot_run(["getvar", "product"], serial=serial)