Add abfs mc plugin

This commit is contained in:
timoxa0 2024-11-14 09:35:33 +05:00
parent 698a24a072
commit 5926f73afa

691
dot-local/share/mc/extfs.d/adbfs Executable file
View file

@ -0,0 +1,691 @@
#! /usr/bin/env python3
"""
adbfs Virtual filesystem for Midnight Commander
* Copyright (c) 2016, Roman Dobosz,
* Published under 3-clause BSD-style license (see LICENSE file)
"""
import configparser
import argparse
from datetime import datetime
import json
import os
import re
import subprocess
import sys
import shlex
__version__ = 0.14
XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
def check_output(command_list, stderr=None):
"""
For some reason, in py3 it was decided that command output should be bytes
instead of string. This little function will check if we have string or
bytes and in case of bytes it will convert it to string.
"""
result = subprocess.check_output(command_list, stderr=stderr)
if not isinstance(result, str):
_result = []
for t in result.split(b'\n'):
if not t:
continue
try:
_result.append(t.decode('utf-8'))
except UnicodeDecodeError:
_result.append(t.decode('iso-8859-1'))
result = '\n'.join(_result) + '\n'
return result
class Conf(object):
"""Simple config parser"""
boxes = {'busybox': {'ls': 'busybox ls -anel',
'rls': 'busybox ls -Ranel {}',
'file_re': r'^(?P<perms>[-bcdlps][-rwxsStT]{9})\s+'
r'(?P<links>\d+)\s'
r'(?P<uid>\d+)\s+'
r'(?P<gid>\d+)\s+'
r'(?P<size>\d+)\s[A-Z,a-z]{3}\s'
r'(?P<date_time>[A-Z,a-z]{3}\s+'
r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s'
r'(?P<name>.*)'},
'toolbox': {'ls': 'toolbox ls -anl',
'rls': 'toolbox ls -Ranl {}',
'file_re': r'^(?P<perms>[-bcdlps][-rwxsStT]{9})\s+'
r'(?P<uid>\d+)\s+'
r'(?P<gid>\d+)\s+'
r'(?P<size>\d+)?\s'
r'(?P<date>\d{4}-\d{2}-\d{2}\s'
r'\d{2}:\d{2})\s'
r'(?P<name>.*)'},
'toybox': {'ls': 'toybox ls -anl',
'rls': 'toybox ls -Ranl {}',
'file_re': r'^(?P<perms>[-bcdlps][-rwxsStT]{9})\s+'
r'(?P<links>\d+)\s+'
r'(?P<uid>\d+)\s+'
r'(?P<gid>\d+)\s+'
r'(?P<size>\d+)?\s'
r'(?P<date>\d{4}-\d{2}-\d{2}\s'
r'\d{2}:\d{2})\s'
r'(?P<name>.*)'}}
def __init__(self):
self.box = None
self.debug = False
self.dirs_to_skip = ['acct', 'charger', 'd', 'dev', 'proc', 'sys']
self.root = None
self.suppress_colors = False
self.adb_command = 'adb'
self.adb_connect = ''
self.try_su = False
self.read()
self.connect()
self.get_the_box()
def get_the_box(self):
"""Detect if we dealing with busybox or toolbox"""
cmd = [self.adb_command] + 'shell which'.split()
try:
with open(os.devnull, 'w') as fnull:
result = check_output(cmd + ['busybox'], stderr=fnull)
if 'busybox' in result:
self.box = Conf.boxes['busybox']
if self.suppress_colors:
self.box.update({'ls': 'busybox ls --color=none -anel',
'rls': 'busybox ls --color=none '
'-Ranel {}'})
Adb.file_re = re.compile(self.box['file_re'])
return
except subprocess.CalledProcessError:
pass
try:
with open(os.devnull, 'w') as fnull:
result = check_output(cmd + ['toybox'], stderr=fnull)
if 'toybox' in result:
self.box = Conf.boxes['toybox']
Adb.file_re = re.compile(self.box['file_re'])
return
except subprocess.CalledProcessError:
pass
try:
with open(os.devnull, 'w') as fnull:
result = check_output(cmd + ['toolbox'], stderr=fnull)
if 'toolbox' in result:
self.box = Conf.boxes['toolbox']
Adb.file_re = re.compile(self.box['file_re'])
return
except subprocess.CalledProcessError:
pass
sys.stderr.write('There is no toolbox or busybox available.\n')
sys.exit(1)
def get_attached_devices(self):
"""Return a list of attached devices"""
cmd = [self.adb_command, 'devices']
devices = []
try:
with open(os.devnull, 'w') as fnull:
result = check_output(cmd, stderr=fnull)
except subprocess.CalledProcessError:
result = ''
for line in result.split('\n'):
if line.startswith('*'):
continue
if line.strip() == 'List of devices attached':
continue
if line.strip() == '':
continue
identifier, _ = line.split()
devices.append(identifier)
return devices
def connect(self):
"""
If adb_connect is non empty string, perform connecting to specified
device over network using an address (or hostname).
"""
if not self.adb_connect:
return
devices = self.get_attached_devices()
for device in devices:
if self.adb_connect in device:
return # already connected, no need to reconnect
cmd = [self.adb_command, 'connect', self.adb_connect]
with open(os.devnull, 'w') as fnull:
result = check_output(cmd, stderr=fnull)
if result.split()[0] == 'connected':
subprocess.call([self.adb_command, 'wait-for-device'])
return
sys.stderr.write('Unable to connect to `%s\'. Is adb over network '
'enabled on device?\n' % self.adb_connect)
sys.exit(2)
def read(self):
"""
Read config file and change the options according to values from that
file.
"""
if not os.path.exists(XDG_CONFIG_HOME):
return
conf_fname = os.path.join(XDG_CONFIG_HOME, 'mc', 'adbfs.ini')
if not os.path.exists(conf_fname):
return
cfg = configparser.ConfigParser()
cfg_map = {'debug': (cfg.getboolean, 'debug'),
'dirs_to_skip': (cfg.get, 'dirs_to_skip'),
'suppress_colors': (cfg.get, 'suppress_colors'),
'root': (cfg.get, 'root'),
'adb_command': (cfg.get, 'adb_command'),
'adb_connect': (cfg.get, 'adb_connect'),
'try_su': (cfg.getboolean, 'try_su')}
cfg.read(conf_fname)
for key, (function, attr) in cfg_map.items():
try:
setattr(self, attr, function('adbfs', key))
except (configparser.NoSectionError, configparser.NoOptionError):
pass
if self.dirs_to_skip and isinstance(self.dirs_to_skip, str):
self.dirs_to_skip = json.loads(self.dirs_to_skip)
self.dirs_to_skip = [x.encode('utf-8') for x in self.dirs_to_skip]
else:
self.dirs_to_skip = []
if self.adb_command:
self.adb_command = os.path.expandvars(self.adb_command)
self.adb_command = os.path.expanduser(self.adb_command)
class File(object):
"""Item in filesystem representation"""
def __init__(self, perms=None, links=1, uid=0, gid=0, size=None,
date_time=None, date=None, name=None):
"""initialize file"""
self.perms = perms
self.links = links
self.uid = uid
self.gid = gid
self.size = size if size else 0
self.date_time = date_time # as string
self.name = name
self.date = date # as string
self.dirname = ''
self.type = None
self.string = None
self.link_target = None
self.filepath = None
def _correct_link(self):
"""Canonize filename and fill the link attr"""
try:
name, target = self.name.split(' -> ')
except ValueError:
return
self.name = name
if not self.size:
self.size = 0
if target.startswith('/'):
self.link_target = target
else:
self.link_target = os.path.abspath(os.path.join(self.dirname,
target))
def update(self, dirname):
"""update object fields"""
month_num = {'Jan': 1,
'Feb': 2,
'Mar': 3,
'Apr': 4,
'May': 5,
'Jun': 6,
'Jul': 7,
'Aug': 8,
'Sep': 9,
'Oct': 10,
'Nov': 11,
'Dec': 12}
self.dirname = dirname
if self.date_time:
date = self.date_time.split()
date = '%s-%02d-%s %s' % (date[1],
month_num[date[0]],
date[3],
date[2])
date = datetime.strptime(date, '%d-%m-%Y %H:%M:%S')
elif self.date:
date = datetime.strptime(self.date, '%Y-%m-%d %H:%M')
self.date_time = date.strftime('%m/%d/%Y %H:%M:01')
self.type = self.perms[0] if self.perms else None
if self.type == 'l' and ' -> ' in self.name:
self._correct_link()
self.filepath = os.path.join(self.dirname, self.name)
def mk_link_relative(self):
"""Convert links to relative"""
self.link_target = os.path.relpath(self.link_target, self.dirname)
def __repr__(self):
"""represent the file/entire node"""
fullname = os.path.join(self.dirname, self.name)
if self.link_target:
fullname += ' -> ' + self.link_target
return '<File {type} {name} {id}>'.format(type=self.type,
name=fullname,
id=hex(id(self)))
def __str__(self):
"""display the file/entire node"""
template = ('{perms} {links:>4} {uid:<8} {gid:<8} {size:>8} '
'{date_time} {fullname}\n')
if not self.name:
return ''
fullname = os.path.join(self.dirname, self.name)
if self.link_target:
fullname += ' -> ' + self.link_target
return template.format(perms=self.perms,
links=self.links,
uid=self.uid,
gid=self.gid,
size=self.size,
date_time=self.date_time,
fullname=fullname)
class Adb(object):
"""Class for interact with android rooted device through adb"""
file_re = None
current_re = re.compile(r'^(\./)?(?P<dir>.+):$')
def __init__(self):
"""Prepare archive content for operations"""
super(Adb, self).__init__()
self.conf = Conf()
self.error = ''
self._entries = []
self._links = {}
self._got_root = False
if self.conf.try_su:
self.__su_check()
def _shell_cmd(self, with_root, *args):
cmd = [self.conf.adb_command, 'shell']
if with_root and self._got_root:
_args = [shlex.quote(x) for x in args]
cmd += ['su', '-c', shlex.quote(' '.join(_args))]
else:
cmd += args
return cmd
def __su_check(self):
"""Check if we are able to get elevated privileges"""
cmd = self._shell_cmd(False, 'su -c whoami')
try:
with open(os.devnull, 'w') as fnull:
result = check_output(cmd, stderr=fnull)
except subprocess.CalledProcessError:
return
if 'root' in result:
self._got_root = True
def _find_target(self, needle):
"""Find link target"""
if needle in self._links:
elem = self._links[needle]
target = os.path.abspath(os.path.join(elem.dirname,
elem.link_target))
return self._find_target(target)
for entry in self._entries:
if entry.filepath == needle:
return entry
return None
def _normalize_links(self):
"""
There might be a case of a chain of linked files, like:
/foo -> /mnt/foo
/bar -> /foo
If one want to follow such 'bar' link - MC in extfs mode will fail to
figure out the right target. This helper will correct the thing.
"""
elems_to_remove = []
for entry in self._links.values():
target_entry = self._find_target(entry.link_target)
if target_entry:
entry.link_target = target_entry.filepath
entry.mk_link_relative()
else:
elems_to_remove.append(self._entries.index(entry))
for idx in sorted(elems_to_remove, reverse=True):
del self._entries[idx]
def _retrieve_single_dir_list(self, dir_):
"""Retrieve file list using adb"""
lscmd = self.conf.box['rls'].format(shlex.quote(dir_))
command = self._shell_cmd(True, *shlex.split(lscmd))
try:
if self.conf.debug:
print('executing', ' '.join(command))
lines = check_output(command)
except subprocess.CalledProcessError:
sys.stderr.write('Cannot read directory. Is device connected?\n')
return 1
lines = [l.strip() for l in lines.split('\n') if l.strip()]
if len(lines) == 1:
reg_match = self.file_re.match(lines[0])
entry = File(**reg_match.groupdict())
entry.update('/')
if entry.filepath in self.conf.dirs_to_skip:
return
self._entries.append(entry)
if entry.type == 'l':
self._links[entry.filepath] = entry
self._retrieve_single_dir_list(entry.link_target)
else:
for line in lines:
current_dir_re = self.current_re.match(line)
if current_dir_re:
current_dir = current_dir_re.groupdict()['dir']
if not current_dir:
current_dir = '/'
continue
reg_match = self.file_re.match(line)
if not reg_match:
continue
entry = File(**reg_match.groupdict())
if entry.name in ('.', '..'):
continue
entry.update(current_dir)
if entry.filepath in self.conf.dirs_to_skip:
continue
self._entries.append(entry)
if entry.type == 'l':
self._links[entry.filepath] = entry
def _retrieve_file_list(self, root=None):
"""Retrieve file list using adb"""
if not root:
lscmd = self.conf.box['ls']
else:
lscmd = self.conf.box['rls'].format(shlex.quote(root.filepath))
command = self._shell_cmd(True, *shlex.split(lscmd))
try:
if self.conf.debug:
print('executing', ' '.join(command))
lines = check_output(command)
except subprocess.CalledProcessError:
sys.stderr.write('Cannot read directory. Is device connected?\n')
return 2
current_dir = root.dirname if root else '/'
for line in lines.split('\n'):
line = line.strip()
current_dir_re = self.current_re.match(line)
if current_dir_re:
current_dir = current_dir_re.groupdict()['dir']
if not current_dir:
current_dir = '/'
continue
reg_match = self.file_re.match(line)
if not reg_match:
continue
entry = File(**reg_match.groupdict())
if entry.name in ('.', '..'):
continue
entry.update(current_dir)
if entry.filepath in self.conf.dirs_to_skip:
continue
self._entries.append(entry)
if root is None and entry.type == 'd':
self._retrieve_file_list(entry)
if entry.type == 'l':
self._links[entry.filepath] = entry
def run(self, fname):
"""Not supported"""
sys.stderr.write('Not supported - or maybe you are on compatible '
'architecture?\n')
return 3
def list(self):
"""Output list contents directory"""
if self.error:
sys.stderr.write(self.error)
return 4
if self.conf.root:
self._retrieve_single_dir_list(self.conf.root)
else:
self._retrieve_file_list()
self._normalize_links()
sys.stdout.write(''.join([str(entry) for entry in self._entries]))
return 0
def copyout(self, src, dst):
"""Copy file form the device using adb."""
if self.error:
sys.stderr.write(self.error)
return 5
cmd = [self.conf.adb_command, 'pull', src, dst]
if self.conf.debug:
sys.stderr.write(' '.join(cmd) + '\n')
with open(os.devnull, 'w') as fnull:
try:
err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 6
return err
def copyin(self, src, dst):
"""Copy file to the device through adb."""
if self.error:
sys.stderr.write(self.error)
return 7
if not dst.startswith('/'):
dst = '/' + dst
cmd = [self.conf.adb_command, 'push', src, dst]
if self.conf.debug:
sys.stderr.write(' '.join(cmd) + '\n')
with open(os.devnull, 'w') as fnull:
try:
err = subprocess.call(cmd, stdout=fnull, stderr=fnull)
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 8
if err != 0:
sys.stderr.write('Cannot push the file, '
'%s, error %d' % (dst, err))
return 9
return 0
def rm(self, dst):
"""Remove file from device."""
if self.error:
sys.stderr.write(self.error)
return 10
cmd = self._shell_cmd(False, 'rm %s' % shlex.quote(dst))
try:
err = check_output(cmd).strip()
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 11
if err != '':
sys.stderr.write(err)
return 12
return 0
def rmdir(self, dst):
"""Remove directory from device."""
if self.error:
sys.stderr.write(self.error)
return 13
cmd = self._shell_cmd(False, 'rm -r %s' % shlex.quote(dst))
try:
err = check_output(cmd).strip()
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 14
if err != '':
sys.stderr.write(err)
return 15
return 0
def mkdir(self, dst):
"""Make directory on the device through adb."""
if self.error:
sys.stderr.write(self.error)
return 16
if not dst.startswith('/'):
dst = '/' + dst
cmd = self._shell_cmd(False, 'mkdir %s' % shlex.quote(dst))
try:
err = check_output(cmd).strip()
except subprocess.CalledProcessError:
sys.stderr.write('Error executing adb shell')
return 17
if err != '':
sys.stderr.write(err)
return 18
return 0
CALL_MAP = {'list': lambda a: Adb().list(),
'copyin': lambda a: Adb().copyin(a.src, a.dst),
'copyout': lambda a: Adb().copyout(a.src, a.dst),
'mkdir': lambda a: Adb().mkdir(a.dst),
'rmdir': lambda a: Adb().rmdir(a.dst),
'rm': lambda a: Adb().rm(a.dst),
'run': lambda a: Adb().run(a.dst)}
def main():
"""parse commandline"""
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help='supported commands')
parser_list = subparsers.add_parser('list')
parser_copyin = subparsers.add_parser('copyin')
parser_copyout = subparsers.add_parser('copyout')
parser_rm = subparsers.add_parser('rm')
parser_mkdir = subparsers.add_parser('mkdir')
parser_rmdir = subparsers.add_parser('rmdir')
parser_run = subparsers.add_parser('run')
parser_list.add_argument('arch')
parser_list.set_defaults(func=CALL_MAP['list'])
parser_copyin.add_argument('arch')
parser_copyin.add_argument('dst')
parser_copyin.add_argument('src')
parser_copyin.set_defaults(func=CALL_MAP['copyin'])
parser_copyout.add_argument('arch')
parser_copyout.add_argument('src')
parser_copyout.add_argument('dst')
parser_copyout.set_defaults(func=CALL_MAP['copyout'])
parser_rm.add_argument('arch')
parser_rm.add_argument('dst')
parser_rm.set_defaults(func=CALL_MAP['rm'])
parser_mkdir.add_argument('arch')
parser_mkdir.add_argument('dst')
parser_mkdir.set_defaults(func=CALL_MAP['mkdir'])
parser_rmdir.add_argument('arch')
parser_rmdir.add_argument('dst')
parser_rmdir.set_defaults(func=CALL_MAP['rmdir'])
parser_run.add_argument('arch')
parser_run.add_argument('dst')
parser_run.set_defaults(func=CALL_MAP['run'])
parser.add_argument('--version', action='version',
version='%(prog)s ' + str(__version__))
args = parser.parse_args()
try:
return args.func(args)
except AttributeError:
parser.print_help()
parser.exit()
if __name__ == '__main__':
sys.exit(main())