Source code for paks.backends.base

__author__ = "Vanessa Sochat, Alec Scott"
__copyright__ = "Copyright 2021-2022, Vanessa Sochat and Alec Scott"
__license__ = "Apache-2.0"

from paks.utils.names import namer
from paks.logger import logger
import paks.utils
import paks.defaults
import paks.templates
import paks.commands
import paks.settings

import subprocess
import select
import string
import pty
import termios
import tty
import os
import sys
import re


[docs]class ContainerName: """ Parse a container name into named parts """ def __init__(self, raw): self.raw = raw self.registry = None self.repository = None self.tool = None self.version = None self.digest = None self.parse(raw) self._name = None @property def extended_name(self): if not self._name: self._name = namer.generate() return self.slug + "-" + self._name @property def name(self): if not self._name: self._name = namer.generate() return self._name @property def slug(self): slug = "" for attr in [self.registry, self.repository, self.tool, self.version]: if attr and not slug: slug = attr.replace(".", "") elif attr and slug: slug = slug + "-" + attr.replace(".", "") return slug
[docs] def parse(self, raw): """ Parse a name into known pieces """ match = re.search(paks.templates.docker_regex, raw) if not match: logger.exit("%s does not match a known identifier pattern." % raw) for key, value in match.groupdict().items(): value = value.strip("/") if value else None setattr(self, key, value)
[docs]class ContainerTechnology: """ A base class for a container technology """ def __init__(self, settings=None): if not settings: settings = paks.settings.Settings(paks.defaults.settings_file) self.settings = settings
[docs] def get_history(self, line, openpty): """ Given an input with some number of up/down and newline, derive command. """ up = line.count("[A") down = line.count("[B") change = up - down # pushed down below history if change <= 0: return "" history = self.hist.run( container_name=self.uri.extended_name, out=openpty, history_file=self.settings.history_file, user=self.settings.user, ) history = [x for x in history.split("\n") if x] if not history: return "" if change > len(history): return "" # here we are looking back up into history (negative index) newline = history[-1 * change] # Add back any characters typed newline += re.split("(\[A|\[B)", line, 1)[-1] return newline
[docs] def encode(self, msg): return bytes((msg).encode("utf-8"))
[docs] def interactive_command(self, cmd): """ Ensure we always restore original TTY otherwise terminal gets messed up """ # Controller to get history self.hist = self.commands.history # save original tty setting then set it to raw mode old_tty = termios.tcgetattr(sys.stdin) old_pty = termios.tcgetattr(sys.stdout) try: self._interactive_command(cmd) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty) termios.tcsetattr(sys.stdout, termios.TCSADRAIN, old_pty)
[docs] def run_executor(self, string_input, openpty): """ Given a string input, run executor """ string_input = string_input.replace("[A", "").replace("[B", "") if not string_input.startswith("#"): return executor = self.commands.get_executor(string_input, out=openpty) if executor is not None: # Provide pre-command message to the terminal if executor.pre_message: print("\n\r" + executor.pre_message) # If we have an executor for the command, run it! # All commands require the original / current name result = executor.run( name=self.image, container_name=self.uri.extended_name, original=string_input, ) if result.message: print("\r" + result.message)
[docs] def clean(self, string_input): string_input = re.sub( r"[^a-zA-Z0-9%s\n\r\w ]" % string.punctuation, "", string_input ) return string_input.replace("\x1b", "")
[docs] def welcome(self, openpty): """ Welcome the user and clear terminal """ # Don't add commands executed to history os.write(openpty, self.encode(" export PROMPT_COMMAND='history -a'\r")) os.write(openpty, self.encode(" clear\r")) os.write(openpty, self.encode(" ### Welcome to PAKS! ###\r"))
def _interactive_command(self, cmd): """ Run an interactive command. """ tty.setraw(sys.stdin.fileno()) # open pseudo-terminal to interact with subprocess openpty, opentty = pty.openpty() # use os.setsid() make it run in a new process group, or bash job control will not be enabled p = subprocess.Popen( cmd, preexec_fn=os.setsid, stdin=opentty, stdout=opentty, stderr=opentty, universal_newlines=True, ) # Welcome to Paks! self.welcome(openpty) string_input = "" while p.poll() is None: r, w, e = select.select([sys.stdin, openpty], [], []) if sys.stdin in r: terminal_input = os.read(sys.stdin.fileno(), 10240) new_char = terminal_input.decode("utf-8") # if we have a backspace (ord 127) if len(new_char) == 1 and ord(new_char) == 127: # Backspace to empty line if len(string_input) > 0: string_input = string_input[:-1] if not string_input: os.write(openpty, terminal_input) continue else: string_input = string_input + new_char # Get rid of left/right string_input = string_input.replace("[D", "").replace("[C", "") has_newline = "\n" in string_input or "\r" in string_input # Replace weird characters and escape sequences string_input = self.clean(string_input) # Universal exit command if "exit" in string_input and has_newline: print("\n\rContainer exited.\n\r") return self.uri.extended_name # Pressing up or down, but not enter if ("[A" in string_input or "[B" in string_input) and not has_newline: string_input = self.get_history(string_input, openpty) os.write(openpty, terminal_input) continue # Pressing up or down with enter if ("[A" in string_input or "[B" in string_input) and has_newline: string_input = self.get_history(string_input, openpty) os.write(openpty, terminal_input) if not string_input: continue # If we have a newline (and possibly a command) if has_newline: self.run_executor(string_input, openpty) # Add derived line to the history os.write(openpty, terminal_input) string_input = "" else: os.write(openpty, terminal_input) elif openpty in r: o = os.read(openpty, 10240) if o: os.write(sys.stdout.fileno(), o) def __str__(self): return str(self.__class__.__name__)