diff --git a/dot-local/share/mc/extfs.d/adbfs b/dot-local/share/mc/extfs.d/adbfs new file mode 100755 index 0000000..3343730 --- /dev/null +++ b/dot-local/share/mc/extfs.d/adbfs @@ -0,0 +1,691 @@ +#! /usr/bin/env python3 +""" +adbfs Virtual filesystem for Midnight Commander + +* Copyright (c) 2016, Roman Dobosz, +* Published under 3-clause BSD-style license (see LICENSE file) +""" +import configparser +import argparse +from datetime import datetime +import json +import os +import re +import subprocess +import sys +import shlex + +__version__ = 0.14 + +XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.expanduser('~/.config')) + + +def check_output(command_list, stderr=None): + """ + For some reason, in py3 it was decided that command output should be bytes + instead of string. This little function will check if we have string or + bytes and in case of bytes it will convert it to string. + """ + result = subprocess.check_output(command_list, stderr=stderr) + if not isinstance(result, str): + _result = [] + for t in result.split(b'\n'): + if not t: + continue + try: + _result.append(t.decode('utf-8')) + except UnicodeDecodeError: + _result.append(t.decode('iso-8859-1')) + result = '\n'.join(_result) + '\n' + return result + + +class Conf(object): + """Simple config parser""" + boxes = {'busybox': {'ls': 'busybox ls -anel', + 'rls': 'busybox ls -Ranel {}', + 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' + r'(?P\d+)\s' + r'(?P\d+)\s+' + r'(?P\d+)\s+' + r'(?P\d+)\s[A-Z,a-z]{3}\s' + r'(?P[A-Z,a-z]{3}\s+' + r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s' + r'(?P.*)'}, + 'toolbox': {'ls': 'toolbox ls -anl', + 'rls': 'toolbox ls -Ranl {}', + 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' + r'(?P\d+)\s+' + r'(?P\d+)\s+' + r'(?P\d+)?\s' + r'(?P\d{4}-\d{2}-\d{2}\s' + r'\d{2}:\d{2})\s' + r'(?P.*)'}, + 'toybox': {'ls': 'toybox ls -anl', + 'rls': 'toybox ls -Ranl {}', + 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' + r'(?P\d+)\s+' + r'(?P\d+)\s+' + r'(?P\d+)\s+' + r'(?P\d+)?\s' + r'(?P\d{4}-\d{2}-\d{2}\s' + r'\d{2}:\d{2})\s' + r'(?P.*)'}} + + def __init__(self): + self.box = None + self.debug = False + self.dirs_to_skip = ['acct', 'charger', 'd', 'dev', 'proc', 'sys'] + self.root = None + self.suppress_colors = False + self.adb_command = 'adb' + self.adb_connect = '' + self.try_su = False + + self.read() + self.connect() + self.get_the_box() + + def get_the_box(self): + """Detect if we dealing with busybox or toolbox""" + cmd = [self.adb_command] + 'shell which'.split() + try: + with open(os.devnull, 'w') as fnull: + result = check_output(cmd + ['busybox'], stderr=fnull) + if 'busybox' in result: + self.box = Conf.boxes['busybox'] + if self.suppress_colors: + self.box.update({'ls': 'busybox ls --color=none -anel', + 'rls': 'busybox ls --color=none ' + '-Ranel {}'}) + Adb.file_re = re.compile(self.box['file_re']) + return + except subprocess.CalledProcessError: + pass + + try: + with open(os.devnull, 'w') as fnull: + result = check_output(cmd + ['toybox'], stderr=fnull) + + if 'toybox' in result: + self.box = Conf.boxes['toybox'] + Adb.file_re = re.compile(self.box['file_re']) + return + except subprocess.CalledProcessError: + pass + + try: + with open(os.devnull, 'w') as fnull: + result = check_output(cmd + ['toolbox'], stderr=fnull) + + if 'toolbox' in result: + self.box = Conf.boxes['toolbox'] + Adb.file_re = re.compile(self.box['file_re']) + return + except subprocess.CalledProcessError: + pass + + sys.stderr.write('There is no toolbox or busybox available.\n') + sys.exit(1) + + def get_attached_devices(self): + """Return a list of attached devices""" + cmd = [self.adb_command, 'devices'] + devices = [] + + try: + with open(os.devnull, 'w') as fnull: + result = check_output(cmd, stderr=fnull) + except subprocess.CalledProcessError: + result = '' + + for line in result.split('\n'): + if line.startswith('*'): + continue + if line.strip() == 'List of devices attached': + continue + if line.strip() == '': + continue + identifier, _ = line.split() + devices.append(identifier) + + return devices + + def connect(self): + """ + If adb_connect is non empty string, perform connecting to specified + device over network using an address (or hostname). + """ + if not self.adb_connect: + return + + devices = self.get_attached_devices() + + for device in devices: + if self.adb_connect in device: + return # already connected, no need to reconnect + + cmd = [self.adb_command, 'connect', self.adb_connect] + with open(os.devnull, 'w') as fnull: + result = check_output(cmd, stderr=fnull) + if result.split()[0] == 'connected': + subprocess.call([self.adb_command, 'wait-for-device']) + return + + sys.stderr.write('Unable to connect to `%s\'. Is adb over network ' + 'enabled on device?\n' % self.adb_connect) + sys.exit(2) + + def read(self): + """ + Read config file and change the options according to values from that + file. + """ + if not os.path.exists(XDG_CONFIG_HOME): + return + + conf_fname = os.path.join(XDG_CONFIG_HOME, 'mc', 'adbfs.ini') + if not os.path.exists(conf_fname): + return + + cfg = configparser.ConfigParser() + cfg_map = {'debug': (cfg.getboolean, 'debug'), + 'dirs_to_skip': (cfg.get, 'dirs_to_skip'), + 'suppress_colors': (cfg.get, 'suppress_colors'), + 'root': (cfg.get, 'root'), + 'adb_command': (cfg.get, 'adb_command'), + 'adb_connect': (cfg.get, 'adb_connect'), + 'try_su': (cfg.getboolean, 'try_su')} + cfg.read(conf_fname) + + for key, (function, attr) in cfg_map.items(): + try: + setattr(self, attr, function('adbfs', key)) + except (configparser.NoSectionError, configparser.NoOptionError): + pass + + if self.dirs_to_skip and isinstance(self.dirs_to_skip, str): + self.dirs_to_skip = json.loads(self.dirs_to_skip) + self.dirs_to_skip = [x.encode('utf-8') for x in self.dirs_to_skip] + else: + self.dirs_to_skip = [] + + if self.adb_command: + self.adb_command = os.path.expandvars(self.adb_command) + self.adb_command = os.path.expanduser(self.adb_command) + + +class File(object): + """Item in filesystem representation""" + def __init__(self, perms=None, links=1, uid=0, gid=0, size=None, + date_time=None, date=None, name=None): + """initialize file""" + self.perms = perms + self.links = links + self.uid = uid + self.gid = gid + self.size = size if size else 0 + self.date_time = date_time # as string + self.name = name + self.date = date # as string + + self.dirname = '' + self.type = None + self.string = None + self.link_target = None + self.filepath = None + + def _correct_link(self): + """Canonize filename and fill the link attr""" + try: + name, target = self.name.split(' -> ') + except ValueError: + return + + self.name = name + + if not self.size: + self.size = 0 + + if target.startswith('/'): + self.link_target = target + else: + self.link_target = os.path.abspath(os.path.join(self.dirname, + target)) + + def update(self, dirname): + """update object fields""" + month_num = {'Jan': 1, + 'Feb': 2, + 'Mar': 3, + 'Apr': 4, + 'May': 5, + 'Jun': 6, + 'Jul': 7, + 'Aug': 8, + 'Sep': 9, + 'Oct': 10, + 'Nov': 11, + 'Dec': 12} + self.dirname = dirname + if self.date_time: + date = self.date_time.split() + date = '%s-%02d-%s %s' % (date[1], + month_num[date[0]], + date[3], + date[2]) + date = datetime.strptime(date, '%d-%m-%Y %H:%M:%S') + elif self.date: + date = datetime.strptime(self.date, '%Y-%m-%d %H:%M') + + self.date_time = date.strftime('%m/%d/%Y %H:%M:01') + + self.type = self.perms[0] if self.perms else None + + if self.type == 'l' and ' -> ' in self.name: + self._correct_link() + + self.filepath = os.path.join(self.dirname, self.name) + + def mk_link_relative(self): + """Convert links to relative""" + self.link_target = os.path.relpath(self.link_target, self.dirname) + + def __repr__(self): + """represent the file/entire node""" + fullname = os.path.join(self.dirname, self.name) + if self.link_target: + fullname += ' -> ' + self.link_target + return ''.format(type=self.type, + name=fullname, + id=hex(id(self))) + + def __str__(self): + """display the file/entire node""" + template = ('{perms} {links:>4} {uid:<8} {gid:<8} {size:>8} ' + '{date_time} {fullname}\n') + + if not self.name: + return '' + + fullname = os.path.join(self.dirname, self.name) + if self.link_target: + fullname += ' -> ' + self.link_target + + return template.format(perms=self.perms, + links=self.links, + uid=self.uid, + gid=self.gid, + size=self.size, + date_time=self.date_time, + fullname=fullname) + + +class Adb(object): + """Class for interact with android rooted device through adb""" + file_re = None + current_re = re.compile(r'^(\./)?(?P.+):$') + + def __init__(self): + """Prepare archive content for operations""" + super(Adb, self).__init__() + self.conf = Conf() + self.error = '' + self._entries = [] + self._links = {} + self._got_root = False + + if self.conf.try_su: + self.__su_check() + + def _shell_cmd(self, with_root, *args): + cmd = [self.conf.adb_command, 'shell'] + + if with_root and self._got_root: + _args = [shlex.quote(x) for x in args] + cmd += ['su', '-c', shlex.quote(' '.join(_args))] + else: + cmd += args + + return cmd + + def __su_check(self): + """Check if we are able to get elevated privileges""" + cmd = self._shell_cmd(False, 'su -c whoami') + try: + with open(os.devnull, 'w') as fnull: + result = check_output(cmd, stderr=fnull) + + except subprocess.CalledProcessError: + return + + if 'root' in result: + self._got_root = True + + def _find_target(self, needle): + """Find link target""" + + if needle in self._links: + elem = self._links[needle] + target = os.path.abspath(os.path.join(elem.dirname, + elem.link_target)) + return self._find_target(target) + + for entry in self._entries: + if entry.filepath == needle: + return entry + + return None + + def _normalize_links(self): + """ + There might be a case of a chain of linked files, like: + + /foo -> /mnt/foo + /bar -> /foo + + If one want to follow such 'bar' link - MC in extfs mode will fail to + figure out the right target. This helper will correct the thing. + """ + elems_to_remove = [] + for entry in self._links.values(): + target_entry = self._find_target(entry.link_target) + if target_entry: + entry.link_target = target_entry.filepath + entry.mk_link_relative() + else: + elems_to_remove.append(self._entries.index(entry)) + + for idx in sorted(elems_to_remove, reverse=True): + del self._entries[idx] + + def _retrieve_single_dir_list(self, dir_): + """Retrieve file list using adb""" + lscmd = self.conf.box['rls'].format(shlex.quote(dir_)) + command = self._shell_cmd(True, *shlex.split(lscmd)) + + try: + if self.conf.debug: + print('executing', ' '.join(command)) + + lines = check_output(command) + except subprocess.CalledProcessError: + sys.stderr.write('Cannot read directory. Is device connected?\n') + return 1 + + lines = [l.strip() for l in lines.split('\n') if l.strip()] + if len(lines) == 1: + reg_match = self.file_re.match(lines[0]) + entry = File(**reg_match.groupdict()) + entry.update('/') + + if entry.filepath in self.conf.dirs_to_skip: + return + + self._entries.append(entry) + if entry.type == 'l': + self._links[entry.filepath] = entry + self._retrieve_single_dir_list(entry.link_target) + else: + for line in lines: + current_dir_re = self.current_re.match(line) + if current_dir_re: + current_dir = current_dir_re.groupdict()['dir'] + if not current_dir: + current_dir = '/' + continue + + reg_match = self.file_re.match(line) + if not reg_match: + continue + + entry = File(**reg_match.groupdict()) + if entry.name in ('.', '..'): + continue + + entry.update(current_dir) + + if entry.filepath in self.conf.dirs_to_skip: + continue + + self._entries.append(entry) + + if entry.type == 'l': + self._links[entry.filepath] = entry + + def _retrieve_file_list(self, root=None): + """Retrieve file list using adb""" + + if not root: + lscmd = self.conf.box['ls'] + else: + lscmd = self.conf.box['rls'].format(shlex.quote(root.filepath)) + + command = self._shell_cmd(True, *shlex.split(lscmd)) + + try: + if self.conf.debug: + print('executing', ' '.join(command)) + + lines = check_output(command) + except subprocess.CalledProcessError: + sys.stderr.write('Cannot read directory. Is device connected?\n') + return 2 + + current_dir = root.dirname if root else '/' + for line in lines.split('\n'): + line = line.strip() + current_dir_re = self.current_re.match(line) + if current_dir_re: + current_dir = current_dir_re.groupdict()['dir'] + if not current_dir: + current_dir = '/' + continue + + reg_match = self.file_re.match(line) + if not reg_match: + continue + + entry = File(**reg_match.groupdict()) + if entry.name in ('.', '..'): + continue + + entry.update(current_dir) + + if entry.filepath in self.conf.dirs_to_skip: + continue + + self._entries.append(entry) + if root is None and entry.type == 'd': + self._retrieve_file_list(entry) + + if entry.type == 'l': + self._links[entry.filepath] = entry + + def run(self, fname): + """Not supported""" + sys.stderr.write('Not supported - or maybe you are on compatible ' + 'architecture?\n') + return 3 + + def list(self): + """Output list contents directory""" + if self.error: + sys.stderr.write(self.error) + return 4 + + if self.conf.root: + self._retrieve_single_dir_list(self.conf.root) + else: + self._retrieve_file_list() + + self._normalize_links() + sys.stdout.write(''.join([str(entry) for entry in self._entries])) + return 0 + + def copyout(self, src, dst): + """Copy file form the device using adb.""" + if self.error: + sys.stderr.write(self.error) + return 5 + + cmd = [self.conf.adb_command, 'pull', src, dst] + if self.conf.debug: + sys.stderr.write(' '.join(cmd) + '\n') + + with open(os.devnull, 'w') as fnull: + try: + err = subprocess.call(cmd, stdout=fnull, stderr=fnull) + except subprocess.CalledProcessError: + sys.stderr.write('Error executing adb shell') + return 6 + + return err + + def copyin(self, src, dst): + """Copy file to the device through adb.""" + if self.error: + sys.stderr.write(self.error) + return 7 + if not dst.startswith('/'): + dst = '/' + dst + + cmd = [self.conf.adb_command, 'push', src, dst] + if self.conf.debug: + sys.stderr.write(' '.join(cmd) + '\n') + + with open(os.devnull, 'w') as fnull: + try: + err = subprocess.call(cmd, stdout=fnull, stderr=fnull) + except subprocess.CalledProcessError: + sys.stderr.write('Error executing adb shell') + return 8 + + if err != 0: + sys.stderr.write('Cannot push the file, ' + '%s, error %d' % (dst, err)) + return 9 + return 0 + + def rm(self, dst): + """Remove file from device.""" + if self.error: + sys.stderr.write(self.error) + return 10 + + cmd = self._shell_cmd(False, 'rm %s' % shlex.quote(dst)) + try: + err = check_output(cmd).strip() + except subprocess.CalledProcessError: + sys.stderr.write('Error executing adb shell') + return 11 + + if err != '': + sys.stderr.write(err) + return 12 + return 0 + + def rmdir(self, dst): + """Remove directory from device.""" + if self.error: + sys.stderr.write(self.error) + return 13 + + cmd = self._shell_cmd(False, 'rm -r %s' % shlex.quote(dst)) + try: + err = check_output(cmd).strip() + except subprocess.CalledProcessError: + sys.stderr.write('Error executing adb shell') + return 14 + + if err != '': + sys.stderr.write(err) + return 15 + return 0 + + def mkdir(self, dst): + """Make directory on the device through adb.""" + if self.error: + sys.stderr.write(self.error) + return 16 + + if not dst.startswith('/'): + dst = '/' + dst + + cmd = self._shell_cmd(False, 'mkdir %s' % shlex.quote(dst)) + try: + err = check_output(cmd).strip() + except subprocess.CalledProcessError: + sys.stderr.write('Error executing adb shell') + return 17 + + if err != '': + sys.stderr.write(err) + return 18 + return 0 + + +CALL_MAP = {'list': lambda a: Adb().list(), + 'copyin': lambda a: Adb().copyin(a.src, a.dst), + 'copyout': lambda a: Adb().copyout(a.src, a.dst), + 'mkdir': lambda a: Adb().mkdir(a.dst), + 'rmdir': lambda a: Adb().rmdir(a.dst), + 'rm': lambda a: Adb().rm(a.dst), + 'run': lambda a: Adb().run(a.dst)} + + +def main(): + """parse commandline""" + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(help='supported commands') + parser_list = subparsers.add_parser('list') + parser_copyin = subparsers.add_parser('copyin') + parser_copyout = subparsers.add_parser('copyout') + parser_rm = subparsers.add_parser('rm') + parser_mkdir = subparsers.add_parser('mkdir') + parser_rmdir = subparsers.add_parser('rmdir') + parser_run = subparsers.add_parser('run') + + parser_list.add_argument('arch') + parser_list.set_defaults(func=CALL_MAP['list']) + + parser_copyin.add_argument('arch') + parser_copyin.add_argument('dst') + parser_copyin.add_argument('src') + parser_copyin.set_defaults(func=CALL_MAP['copyin']) + + parser_copyout.add_argument('arch') + parser_copyout.add_argument('src') + parser_copyout.add_argument('dst') + parser_copyout.set_defaults(func=CALL_MAP['copyout']) + + parser_rm.add_argument('arch') + parser_rm.add_argument('dst') + parser_rm.set_defaults(func=CALL_MAP['rm']) + + parser_mkdir.add_argument('arch') + parser_mkdir.add_argument('dst') + parser_mkdir.set_defaults(func=CALL_MAP['mkdir']) + + parser_rmdir.add_argument('arch') + parser_rmdir.add_argument('dst') + parser_rmdir.set_defaults(func=CALL_MAP['rmdir']) + + parser_run.add_argument('arch') + parser_run.add_argument('dst') + parser_run.set_defaults(func=CALL_MAP['run']) + + parser.add_argument('--version', action='version', + version='%(prog)s ' + str(__version__)) + + args = parser.parse_args() + + try: + return args.func(args) + except AttributeError: + parser.print_help() + parser.exit() + + +if __name__ == '__main__': + sys.exit(main())