#! /usr/bin/env python3 """ adbfs Virtual filesystem for Midnight Commander * Copyright (c) 2016, Roman Dobosz, * Published under 3-clause BSD-style license (see LICENSE file) """ import configparser import argparse from datetime import datetime import json import os import re import subprocess import sys import shlex __version__ = 0.14 XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME', os.path.expanduser('~/.config')) def check_output(command_list, stderr=None): """ For some reason, in py3 it was decided that command output should be bytes instead of string. This little function will check if we have string or bytes and in case of bytes it will convert it to string. """ result = subprocess.check_output(command_list, stderr=stderr) if not isinstance(result, str): _result = [] for t in result.split(b'\n'): if not t: continue try: _result.append(t.decode('utf-8')) except UnicodeDecodeError: _result.append(t.decode('iso-8859-1')) result = '\n'.join(_result) + '\n' return result class Conf(object): """Simple config parser""" boxes = {'busybox': {'ls': 'busybox ls -anel', 'rls': 'busybox ls -Ranel {}', 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)\s[A-Z,a-z]{3}\s' r'(?P[A-Z,a-z]{3}\s+' r'\d+\s\d{2}:\d{2}:\d{2}\s+\d{4})\s' r'(?P.*)'}, 'toolbox': {'ls': 'toolbox ls -anl', 'rls': 'toolbox ls -Ranl {}', 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)?\s' r'(?P\d{4}-\d{2}-\d{2}\s' r'\d{2}:\d{2})\s' r'(?P.*)'}, 'toybox': {'ls': 'toybox ls -anl', 'rls': 'toybox ls -Ranl {}', 'file_re': r'^(?P[-bcdlps][-rwxsStT]{9})\s+' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)\s+' r'(?P\d+)?\s' r'(?P\d{4}-\d{2}-\d{2}\s' r'\d{2}:\d{2})\s' r'(?P.*)'}} def __init__(self): self.box = None self.debug = False self.dirs_to_skip = ['acct', 'charger', 'd', 'dev', 'proc', 'sys'] self.root = None self.suppress_colors = False self.adb_command = 'adb' self.adb_connect = '' self.try_su = False self.read() self.connect() self.get_the_box() def get_the_box(self): """Detect if we dealing with busybox or toolbox""" cmd = [self.adb_command] + 'shell which'.split() try: with open(os.devnull, 'w') as fnull: result = check_output(cmd + ['busybox'], stderr=fnull) if 'busybox' in result: self.box = Conf.boxes['busybox'] if self.suppress_colors: self.box.update({'ls': 'busybox ls --color=none -anel', 'rls': 'busybox ls --color=none ' '-Ranel {}'}) Adb.file_re = re.compile(self.box['file_re']) return except subprocess.CalledProcessError: pass try: with open(os.devnull, 'w') as fnull: result = check_output(cmd + ['toybox'], stderr=fnull) if 'toybox' in result: self.box = Conf.boxes['toybox'] Adb.file_re = re.compile(self.box['file_re']) return except subprocess.CalledProcessError: pass try: with open(os.devnull, 'w') as fnull: result = check_output(cmd + ['toolbox'], stderr=fnull) if 'toolbox' in result: self.box = Conf.boxes['toolbox'] Adb.file_re = re.compile(self.box['file_re']) return except subprocess.CalledProcessError: pass sys.stderr.write('There is no toolbox or busybox available.\n') sys.exit(1) def get_attached_devices(self): """Return a list of attached devices""" cmd = [self.adb_command, 'devices'] devices = [] try: with open(os.devnull, 'w') as fnull: result = check_output(cmd, stderr=fnull) except subprocess.CalledProcessError: result = '' for line in result.split('\n'): if line.startswith('*'): continue if line.strip() == 'List of devices attached': continue if line.strip() == '': continue identifier, _ = line.split() devices.append(identifier) return devices def connect(self): """ If adb_connect is non empty string, perform connecting to specified device over network using an address (or hostname). """ if not self.adb_connect: return devices = self.get_attached_devices() for device in devices: if self.adb_connect in device: return # already connected, no need to reconnect cmd = [self.adb_command, 'connect', self.adb_connect] with open(os.devnull, 'w') as fnull: result = check_output(cmd, stderr=fnull) if result.split()[0] == 'connected': subprocess.call([self.adb_command, 'wait-for-device']) return sys.stderr.write('Unable to connect to `%s\'. Is adb over network ' 'enabled on device?\n' % self.adb_connect) sys.exit(2) def read(self): """ Read config file and change the options according to values from that file. """ if not os.path.exists(XDG_CONFIG_HOME): return conf_fname = os.path.join(XDG_CONFIG_HOME, 'mc', 'adbfs.ini') if not os.path.exists(conf_fname): return cfg = configparser.ConfigParser() cfg_map = {'debug': (cfg.getboolean, 'debug'), 'dirs_to_skip': (cfg.get, 'dirs_to_skip'), 'suppress_colors': (cfg.get, 'suppress_colors'), 'root': (cfg.get, 'root'), 'adb_command': (cfg.get, 'adb_command'), 'adb_connect': (cfg.get, 'adb_connect'), 'try_su': (cfg.getboolean, 'try_su')} cfg.read(conf_fname) for key, (function, attr) in cfg_map.items(): try: setattr(self, attr, function('adbfs', key)) except (configparser.NoSectionError, configparser.NoOptionError): pass if self.dirs_to_skip and isinstance(self.dirs_to_skip, str): self.dirs_to_skip = json.loads(self.dirs_to_skip) self.dirs_to_skip = [x.encode('utf-8') for x in self.dirs_to_skip] else: self.dirs_to_skip = [] if self.adb_command: self.adb_command = os.path.expandvars(self.adb_command) self.adb_command = os.path.expanduser(self.adb_command) class File(object): """Item in filesystem representation""" def __init__(self, perms=None, links=1, uid=0, gid=0, size=None, date_time=None, date=None, name=None): """initialize file""" self.perms = perms self.links = links self.uid = uid self.gid = gid self.size = size if size else 0 self.date_time = date_time # as string self.name = name self.date = date # as string self.dirname = '' self.type = None self.string = None self.link_target = None self.filepath = None def _correct_link(self): """Canonize filename and fill the link attr""" try: name, target = self.name.split(' -> ') except ValueError: return self.name = name if not self.size: self.size = 0 if target.startswith('/'): self.link_target = target else: self.link_target = os.path.abspath(os.path.join(self.dirname, target)) def update(self, dirname): """update object fields""" month_num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} self.dirname = dirname if self.date_time: date = self.date_time.split() date = '%s-%02d-%s %s' % (date[1], month_num[date[0]], date[3], date[2]) date = datetime.strptime(date, '%d-%m-%Y %H:%M:%S') elif self.date: date = datetime.strptime(self.date, '%Y-%m-%d %H:%M') self.date_time = date.strftime('%m/%d/%Y %H:%M:01') self.type = self.perms[0] if self.perms else None if self.type == 'l' and ' -> ' in self.name: self._correct_link() self.filepath = os.path.join(self.dirname, self.name) def mk_link_relative(self): """Convert links to relative""" self.link_target = os.path.relpath(self.link_target, self.dirname) def __repr__(self): """represent the file/entire node""" fullname = os.path.join(self.dirname, self.name) if self.link_target: fullname += ' -> ' + self.link_target return ''.format(type=self.type, name=fullname, id=hex(id(self))) def __str__(self): """display the file/entire node""" template = ('{perms} {links:>4} {uid:<8} {gid:<8} {size:>8} ' '{date_time} {fullname}\n') if not self.name: return '' fullname = os.path.join(self.dirname, self.name) if self.link_target: fullname += ' -> ' + self.link_target return template.format(perms=self.perms, links=self.links, uid=self.uid, gid=self.gid, size=self.size, date_time=self.date_time, fullname=fullname) class Adb(object): """Class for interact with android rooted device through adb""" file_re = None current_re = re.compile(r'^(\./)?(?P.+):$') def __init__(self): """Prepare archive content for operations""" super(Adb, self).__init__() self.conf = Conf() self.error = '' self._entries = [] self._links = {} self._got_root = False if self.conf.try_su: self.__su_check() def _shell_cmd(self, with_root, *args): cmd = [self.conf.adb_command, 'shell'] if with_root and self._got_root: _args = [shlex.quote(x) for x in args] cmd += ['su', '-c', shlex.quote(' '.join(_args))] else: cmd += args return cmd def __su_check(self): """Check if we are able to get elevated privileges""" cmd = self._shell_cmd(False, 'su -c whoami') try: with open(os.devnull, 'w') as fnull: result = check_output(cmd, stderr=fnull) except subprocess.CalledProcessError: return if 'root' in result: self._got_root = True def _find_target(self, needle): """Find link target""" if needle in self._links: elem = self._links[needle] target = os.path.abspath(os.path.join(elem.dirname, elem.link_target)) return self._find_target(target) for entry in self._entries: if entry.filepath == needle: return entry return None def _normalize_links(self): """ There might be a case of a chain of linked files, like: /foo -> /mnt/foo /bar -> /foo If one want to follow such 'bar' link - MC in extfs mode will fail to figure out the right target. This helper will correct the thing. """ elems_to_remove = [] for entry in self._links.values(): target_entry = self._find_target(entry.link_target) if target_entry: entry.link_target = target_entry.filepath entry.mk_link_relative() else: elems_to_remove.append(self._entries.index(entry)) for idx in sorted(elems_to_remove, reverse=True): del self._entries[idx] def _retrieve_single_dir_list(self, dir_): """Retrieve file list using adb""" lscmd = self.conf.box['rls'].format(shlex.quote(dir_)) command = self._shell_cmd(True, *shlex.split(lscmd)) try: if self.conf.debug: print('executing', ' '.join(command)) lines = check_output(command) except subprocess.CalledProcessError: sys.stderr.write('Cannot read directory. Is device connected?\n') return 1 lines = [l.strip() for l in lines.split('\n') if l.strip()] if len(lines) == 1: reg_match = self.file_re.match(lines[0]) entry = File(**reg_match.groupdict()) entry.update('/') if entry.filepath in self.conf.dirs_to_skip: return self._entries.append(entry) if entry.type == 'l': self._links[entry.filepath] = entry self._retrieve_single_dir_list(entry.link_target) else: for line in lines: current_dir_re = self.current_re.match(line) if current_dir_re: current_dir = current_dir_re.groupdict()['dir'] if not current_dir: current_dir = '/' continue reg_match = self.file_re.match(line) if not reg_match: continue entry = File(**reg_match.groupdict()) if entry.name in ('.', '..'): continue entry.update(current_dir) if entry.filepath in self.conf.dirs_to_skip: continue self._entries.append(entry) if entry.type == 'l': self._links[entry.filepath] = entry def _retrieve_file_list(self, root=None): """Retrieve file list using adb""" if not root: lscmd = self.conf.box['ls'] else: lscmd = self.conf.box['rls'].format(shlex.quote(root.filepath)) command = self._shell_cmd(True, *shlex.split(lscmd)) try: if self.conf.debug: print('executing', ' '.join(command)) lines = check_output(command) except subprocess.CalledProcessError: sys.stderr.write('Cannot read directory. Is device connected?\n') return 2 current_dir = root.dirname if root else '/' for line in lines.split('\n'): line = line.strip() current_dir_re = self.current_re.match(line) if current_dir_re: current_dir = current_dir_re.groupdict()['dir'] if not current_dir: current_dir = '/' continue reg_match = self.file_re.match(line) if not reg_match: continue entry = File(**reg_match.groupdict()) if entry.name in ('.', '..'): continue entry.update(current_dir) if entry.filepath in self.conf.dirs_to_skip: continue self._entries.append(entry) if root is None and entry.type == 'd': self._retrieve_file_list(entry) if entry.type == 'l': self._links[entry.filepath] = entry def run(self, fname): """Not supported""" sys.stderr.write('Not supported - or maybe you are on compatible ' 'architecture?\n') return 3 def list(self): """Output list contents directory""" if self.error: sys.stderr.write(self.error) return 4 if self.conf.root: self._retrieve_single_dir_list(self.conf.root) else: self._retrieve_file_list() self._normalize_links() sys.stdout.write(''.join([str(entry) for entry in self._entries])) return 0 def copyout(self, src, dst): """Copy file form the device using adb.""" if self.error: sys.stderr.write(self.error) return 5 cmd = [self.conf.adb_command, 'pull', src, dst] if self.conf.debug: sys.stderr.write(' '.join(cmd) + '\n') with open(os.devnull, 'w') as fnull: try: err = subprocess.call(cmd, stdout=fnull, stderr=fnull) except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 6 return err def copyin(self, src, dst): """Copy file to the device through adb.""" if self.error: sys.stderr.write(self.error) return 7 if not dst.startswith('/'): dst = '/' + dst cmd = [self.conf.adb_command, 'push', src, dst] if self.conf.debug: sys.stderr.write(' '.join(cmd) + '\n') with open(os.devnull, 'w') as fnull: try: err = subprocess.call(cmd, stdout=fnull, stderr=fnull) except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 8 if err != 0: sys.stderr.write('Cannot push the file, ' '%s, error %d' % (dst, err)) return 9 return 0 def rm(self, dst): """Remove file from device.""" if self.error: sys.stderr.write(self.error) return 10 cmd = self._shell_cmd(False, 'rm %s' % shlex.quote(dst)) try: err = check_output(cmd).strip() except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 11 if err != '': sys.stderr.write(err) return 12 return 0 def rmdir(self, dst): """Remove directory from device.""" if self.error: sys.stderr.write(self.error) return 13 cmd = self._shell_cmd(False, 'rm -r %s' % shlex.quote(dst)) try: err = check_output(cmd).strip() except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 14 if err != '': sys.stderr.write(err) return 15 return 0 def mkdir(self, dst): """Make directory on the device through adb.""" if self.error: sys.stderr.write(self.error) return 16 if not dst.startswith('/'): dst = '/' + dst cmd = self._shell_cmd(False, 'mkdir %s' % shlex.quote(dst)) try: err = check_output(cmd).strip() except subprocess.CalledProcessError: sys.stderr.write('Error executing adb shell') return 17 if err != '': sys.stderr.write(err) return 18 return 0 CALL_MAP = {'list': lambda a: Adb().list(), 'copyin': lambda a: Adb().copyin(a.src, a.dst), 'copyout': lambda a: Adb().copyout(a.src, a.dst), 'mkdir': lambda a: Adb().mkdir(a.dst), 'rmdir': lambda a: Adb().rmdir(a.dst), 'rm': lambda a: Adb().rm(a.dst), 'run': lambda a: Adb().run(a.dst)} def main(): """parse commandline""" parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(help='supported commands') parser_list = subparsers.add_parser('list') parser_copyin = subparsers.add_parser('copyin') parser_copyout = subparsers.add_parser('copyout') parser_rm = subparsers.add_parser('rm') parser_mkdir = subparsers.add_parser('mkdir') parser_rmdir = subparsers.add_parser('rmdir') parser_run = subparsers.add_parser('run') parser_list.add_argument('arch') parser_list.set_defaults(func=CALL_MAP['list']) parser_copyin.add_argument('arch') parser_copyin.add_argument('dst') parser_copyin.add_argument('src') parser_copyin.set_defaults(func=CALL_MAP['copyin']) parser_copyout.add_argument('arch') parser_copyout.add_argument('src') parser_copyout.add_argument('dst') parser_copyout.set_defaults(func=CALL_MAP['copyout']) parser_rm.add_argument('arch') parser_rm.add_argument('dst') parser_rm.set_defaults(func=CALL_MAP['rm']) parser_mkdir.add_argument('arch') parser_mkdir.add_argument('dst') parser_mkdir.set_defaults(func=CALL_MAP['mkdir']) parser_rmdir.add_argument('arch') parser_rmdir.add_argument('dst') parser_rmdir.set_defaults(func=CALL_MAP['rmdir']) parser_run.add_argument('arch') parser_run.add_argument('dst') parser_run.set_defaults(func=CALL_MAP['run']) parser.add_argument('--version', action='version', version='%(prog)s ' + str(__version__)) args = parser.parse_args() try: return args.func(args) except AttributeError: parser.print_help() parser.exit() if __name__ == '__main__': sys.exit(main())