Source code for paks.commands.command

__author__ = "Vanessa Sochat, Alec Scott"
__copyright__ = "Copyright 2021-2022, Vanessa Sochat and Alec Scott"
__license__ = "Apache-2.0"

import os
import locale
import subprocess
import shlex
import sys
import paks.env
import paks.utils


[docs]class Result: """ A Result objet holds output and error, and a return value and message """ def __init__(self, out=None, err=None, retval=0, msg=None): self.returncode = retval self.out = out or [] self.err = err or [] self.message = msg
[docs]class Command: """Class method to invoke shell commands and retrieve output and error. This class is inspired and derived from utils functions in https://github.com/vsoch/scif """ # Assume we can support all three supported_tech = ["docker", "podman", "singularity"] # Message to print before run pre_message = None # Parse kwargs? (e.g., envars will have=) parse_kwargs = True def __init__(self, tech, required=None, out=None): """ Backend is required to update history. """ self.tech = tech self.required = required or [] self.failed = False self.out = out or sys.stdout.fileno() # We don't need editor for interactive commands self.env = paks.env.Environment(quiet=True) # Don't add commands executed to history os.putenv("HISTCONTROL", "ignorespace") os.environ["HISTCONTROL"] = "ignorespace"
[docs] def execute(self, cmd): """ Execute a command to the container """ # Extra space prevents saving to history os.write(self.out, self.encode(" \r %s" % cmd))
[docs] def execute_get(self, runcmd, getcmd): """ Execute and get runs a command inside the container (pipes to temporary file) and then loads from the outside. """ # This is run inside the container self.run_hidden(runcmd) out, err = self.execute_host(getcmd) return out
[docs] def execute_host(self, cmd): """ Execute a command to the host, return out and error """ # This is run outside the container res = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) out, err = res.communicate() return out, err
[docs] def run_hidden(self, cmd): """ Run a hidden command. """ # TODO how to hide this? os.write(self.out, self.encode(" %s\r" % cmd))
[docs] def check(self, **kwargs): """ Check ensures that: 1. The container tech of the command matches the class 2. Required arguments are provided. """ if self.tech not in self.supported_tech: return self.return_failure( "This command is not specified to run with %s." % self.tech ) # Required args for all commands for entry in self.required: if entry not in kwargs: return self.return_failure("%s is required." % entry) args = kwargs.get("original") self.kwargs = kwargs # Get args (parsing from the original command line) parsed_args = [] if args: parsed_args, kwargs = self.get_args(args) self.kwargs.update(kwargs) self.args = parsed_args
[docs] def get_args(self, cmd): """ Once we get here, we only care about additional command args. """ parts = cmd.split(" ") # Pop off the command (we already use it) parts.pop(0).strip() kwargs = {} args = [] for arg in parts: # This is an arg if "=" not in arg: arg = arg.strip() # Don't append empty args if not arg: continue args.append(arg) continue # This is a kwarg if self.parse_kwargs: key, val = arg.split("=", 1) kwargs[key.strip()] = val.strip() # A command can choose to not split/parse else: args.append(arg.strip()) return args, kwargs
[docs] def return_failure(self, message, out=None, err=None): """ Return a failed result (requires a message) """ return Result(msg=message, retval=1, out=out, err=err)
[docs] def return_success(self, message=None, out=None, err=None): """ Return a successful result """ return Result(msg=message, retval=0, out=None, err=None)
[docs] def do_print(self, line, clear=True): if clear: print("\r") print(line, end="\r")
[docs] def run_command(self, cmd, output="output"): """ Wrapper to stream a command, which handles returning a result on error. """ print("\r") lines = self.stream_command(cmd, output) while True: try: line = next(lines) self.do_print(line, False) # We use this to return the result except StopIteration as e: return e.value break
[docs] def stream_command(self, cmd, output="output"): """ Stream a command and use output or error. """ process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) stream = process.stdout.readline if output == "error": stream = process.stderr.readline # Stream lines back to the caller for line in iter(stream, ""): yield line # If there is an error, raise. process.stdout.close() process.stderr.close() return_code = process.wait() # If failed, send failed result up to calling function if return_code: return self.return_failure("Failed: %s" % " ".join(cmd))
[docs] def parse_command(self, cmd): """this is called when a new command is provided to ensure we have a list. We don't check that the executable is on the path, as the initialization might not occur in the runtime environment. """ if not isinstance(cmd, list): cmd = shlex.split(cmd) return cmd
[docs] def encode(self, line): return bytes(line.encode("utf-8"))
[docs] def decode(self, line): """Given a line of output (error or regular) decode using the system default, if appropriate """ loc = locale.getdefaultlocale()[1] try: line = line.decode(loc) except: pass return line