From 7b8fbf672b8e70bf7ae83f2449f566aa99d3227b Mon Sep 17 00:00:00 2001 From: Gnarwhal Date: Sat, 31 Aug 2024 05:42:26 +0000 Subject: [PATCH] Reorganised to use a plugin system --- src/sshare/main.py | 225 ++++++++---------- src/sshare/plugins/config.py | 27 +++ src/sshare/plugins/default/append_type.py | 34 +++ .../default/command_line.py} | 0 src/sshare/plugins/default/current_time.py | 55 +++++ src/sshare/plugins/default/file.py | 36 +++ src/sshare/plugins/default/ssh.py | 64 +++++ src/sshare/plugins/source.py | 24 ++ 8 files changed, 337 insertions(+), 128 deletions(-) create mode 100644 src/sshare/plugins/config.py create mode 100644 src/sshare/plugins/default/append_type.py rename src/sshare/{command_line_logger.py => plugins/default/command_line.py} (100%) create mode 100644 src/sshare/plugins/default/current_time.py create mode 100644 src/sshare/plugins/default/file.py create mode 100644 src/sshare/plugins/default/ssh.py create mode 100644 src/sshare/plugins/source.py diff --git a/src/sshare/main.py b/src/sshare/main.py index 30c27e7..f0c63ae 100644 --- a/src/sshare/main.py +++ b/src/sshare/main.py @@ -26,69 +26,11 @@ import sys from pathlib import Path from version import version -class Config: - def __init__(self, config_directory): - _config = self._load_from_file(config_directory / "config.toml") +from plugins.config import Default +from plugins.config import NoDefault +from plugins.config import Flags - self.source_directory = _config.get("source_directory") - - host = _config.get("host") - if host == None: - print("Error: 'host' cannot be 'None'") - sys.exit(1) - self.host_protocol = host.get("protocol") - self.host_name = host.get("name") - self.host_port = host.get("port") - self.host_path = host.get("path") - if self.host_protocol == None: - self.host_protocol = "https" - if self.host_name == None: - print("Error: 'host.name' cannot be 'None'") - sys.exit(1) - if self.host_port == None: - self.host_port = "" - else: - self.host_port = f":{self.host_port}" - if self.host_path == None: - self.host_path = "" - - ssh = _config.get("ssh") - if ssh == None: - print("Error: 'ssh' cannot be 'None'") - sys.exit(1) - self.ssh_port = ssh.get("port") - self.ssh_user = ssh.get("user") - self.ssh_path = ssh.get("path") - if self.ssh_port == None: - self.ssh_port = 22 - if self.ssh_user == None: - self.ssh_user = getpass.getuser() - if self.ssh_path == None: - print("Error: 'ssh.path' cannot be 'None'") - sys.exit(1) - - def _load_from_file(self, config_path): - with open(config_path, mode="rb") as file: - return tomllib.load(file) - - -def rebase(number): - if number == 0: - return "0" - rebased = "" - while number != 0: - digit = number % 62 - if digit < 10: - rebased = chr(digit + 48) + rebased - elif digit < 36: - rebased = chr(digit + 87) + rebased - else: - rebased = chr(digit + 29) + rebased - number = int(number / 62) - return rebased - - -class MetaLogger: +class Congloggerate: def __init__(self, loggers): def info(message): for logger in loggers: @@ -106,6 +48,14 @@ class MetaLogger: self.warn = warn self.error = error + fatalicize(self) + +def fatalicize(logger): + def fatal(message): + logger.error(message) + sys.exit(1) + setattr(logger, "fatal", fatal) + class Plugin: def __init__(self, name, module): self.name = name @@ -113,12 +63,32 @@ class Plugin: def main(): config_directory = Path(os.environ.get("XDG_CONFIG_DIR") or f"{os.environ["HOME"]}/.config") / "sshare" - logger = importlib.import_module("command_line_logger") - modules = { - "logger": [ Plugin("command_line_logger", logger) ], - "data": [], + with open(config_directory / "config.toml", mode="rb") as file: + config = tomllib.load(file) + + # Load command line early and set it as the active logger + # so that it can be used to report errors while loading and + # configuring plugins + # i.e. before other logging plugins have had a chance to be initialised + logger = importlib.import_module("plugins.default.command_line") + fatalicize(logger) + + # Load inbuilt plugins + plugins_flat = [ + Plugin("command_line", logger), + Plugin("file", importlib.import_module("plugins.default.file")), + Plugin("current_time", importlib.import_module("plugins.default.current_time")), + Plugin("append_type", importlib.import_module("plugins.default.append_type")), + Plugin("ssh", importlib.import_module("plugins.default.ssh")), + ] + plugins = { + "logger": [], + "source": [], + "name": [], + "upload": [], } - error = False + + # Load external plugins sys.dont_write_bytecode = True for plugin in (config_directory / "plugins").iterdir(): if plugin.is_file(): @@ -128,77 +98,76 @@ def main(): ) module = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(module) + plugins_flat.append(Plugin(plugin.stem, module)) + sys.dont_write_bytecode = False - modules_of_type = modules.get(module.plugin_type) - if modules_of_type == None: - logger.error(f"Error: Plugin '{plugin.stem}' has an invalid plugin type '{module.plugin_type}'") - error = True - else: - modules_of_type.append(Plugin(plugin.stem, module)) - if hasattr(module, "init"): - module.init() + # Set plugin configurations from config file + if config.get("plugins") == None: + config["plugins"] = {} + for plugin in plugins_flat: + if hasattr(plugin.module, "config"): + plugin_config = config["plugins"].get(plugin.name) + if plugin_config != None: + for config_entry in plugin_config.items(): + plugin.module.config[config_entry[0]] = Default(config_entry[1]) + + # Flatten plugin configs + class PluginConfig: pass + error = False + for plugin in plugins_flat: + if hasattr(plugin.module, "config"): + config = plugin.module.config + plugin.module.config = PluginConfig() + for config_entry in config.items(): + if isinstance(config_entry[1], NoDefault): + logger.error(f"{plugin.name} > Error: Value '{config_entry[0]}' has no default value and must be specified explicitly") + error = True + elif isinstance(config_entry[1], Default): + setattr(plugin.module.config, config_entry[0], config_entry[1].value) + else: + setattr(plugin.module.config, config_entry[0], config_entry[1]) if error: sys.exit(1) - sys.dont_write_bytecode = False - logger = MetaLogger([ logger.module for logger in modules["logger"] ]) - logger.info("Successfully loaded plugins") - arguments = parse_arguments() + # Initialise plugins + for plugin in plugins_flat: + setattr(plugin.module, "logger", logger) + if hasattr(plugin.module, "init"): + plugin.module.init() - config = Config(config_directory) - - contents = b'' - target_file_extension = "" - if arguments.latest or arguments.file != None: - file_path = "" - if arguments.latest: - if config.source_directory == "": - print("Option 'latest' requires source directory to be specified") - sys.exit(1) - file_path = _latest(config.source_directory) - else: - file_path = arguments.file - print(f"Uploading file '{file_path}'") - - with open(file_path, mode="rb") as file: - contents = file.read() - - (_, target_file_extension) = os.path.splitext(file_path) - elif arguments.paste: - print("Uploading contents of clipboard") - contents = pyclip.paste() - target_file_extension = ".txt" - else: - print("Error: must specify one of -f FILE, -l, -p") - sys.exit(1) - - target_id = rebase(time.time_ns()) - target_file_name = f"{target_id}{target_file_extension}" - target_file = f"{config.ssh_path}/{target_file_name}" - target_destination = f"{config.ssh_user}@{config.host_name}" - print(f"Uploading to host: {target_destination}, port: {config.ssh_port}, file: {target_file}") - process = subprocess.run([ - "ssh", - f"-p {config.ssh_port}", - target_destination, - "-T", - f"cat - > {target_file}" - ], - input = contents, - ) - if process.returncode != 0: - print("Error: failed to upload file") + # Sort plugins by type + error = False + for plugin in plugins_flat: + if isinstance(plugin.module.plugin_type, str): + plugin.module.plugin_type = [ plugin.module.plugin_type ] + for plugin_type in plugin.module.plugin_type: + plugins_of_type = plugins.get(plugin_type) + if plugins_of_type == None: + logger.error(f"Error: Plugin '{plugin.name}' has an invalid plugin type '{plugin_type}'") + error = True + else: + plugins_of_type.append(plugin) + if error: sys.exit(1) - target_url = f"{config.host_protocol}://{config.host_name}{config.host_port}{config.host_path}/{target_file_name}" - print(f"File available at '{target_url}'") - if arguments.copy: - pyclip.copy(target_url) - print("URL copied to clipboard") + logger = Congloggerate([ logger.module for logger in plugins["logger"] ]) + + sources = [] + for plugin in plugins["source"]: + sources.append(plugin.module.get()) + + for index, source in enumerate(sources): + name = "" + for plugin in plugins["name"]: + name = plugin.module.name(name, source) + sources[index] = name, source + + for (name, source) in sources: + for plugin in plugins["upload"]: + plugin.module.upload(name, source) sys.exit(0) - def parse_arguments(): parser = argparse.ArgumentParser( prog = "SSHare", diff --git a/src/sshare/plugins/config.py b/src/sshare/plugins/config.py new file mode 100644 index 0000000..79199ad --- /dev/null +++ b/src/sshare/plugins/config.py @@ -0,0 +1,27 @@ +# This file is part of SSHare. +# +# SSHare is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, +# either version 3 of the License, or (at your option) any later version. +# +# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# SSHare. If not, see . + +class NoDefault: + def __init__(self, flags=None): + self.flags = flags + +class Default: + def __init__(self, value, flags=None): + self.value = value + self.flags = flags + +class Flags: + def __init__(self, short=None, long=None): + self.short = short + self.long = long diff --git a/src/sshare/plugins/default/append_type.py b/src/sshare/plugins/default/append_type.py new file mode 100644 index 0000000..8d65e3e --- /dev/null +++ b/src/sshare/plugins/default/append_type.py @@ -0,0 +1,34 @@ +# This file is part of SSHare. +# +# SSHare is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, +# either version 3 of the License, or (at your option) any later version. +# +# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# SSHare. If not, see . + +from ..source import File +from ..source import Raw + +plugin_type = "name" + +def name(name, source): + if isinstance(source, File): + if source.path.is_dir(): + return name + else: + start = 1 + components = source.path.name.split(".") + if components[0] == "": + start += 1 + if start > len(components): + return name + else: + return name + "." + ".".join(components[start:]) + elif isinstance(source, Raw): + return name + f".{source.type}" diff --git a/src/sshare/command_line_logger.py b/src/sshare/plugins/default/command_line.py similarity index 100% rename from src/sshare/command_line_logger.py rename to src/sshare/plugins/default/command_line.py diff --git a/src/sshare/plugins/default/current_time.py b/src/sshare/plugins/default/current_time.py new file mode 100644 index 0000000..1c37444 --- /dev/null +++ b/src/sshare/plugins/default/current_time.py @@ -0,0 +1,55 @@ +# This file is part of SSHare. +# +# SSHare is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, +# either version 3 of the License, or (at your option) any later version. +# +# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# SSHare. If not, see . + +import time + +from ..config import Default +from ..config import Flags +from ..source import File + +plugin_type = "name" + +config = { + "base": 62, +} + +def init(): + if not isinstance(config.base, int): + logger.fatal("Error: 'base' must be an integer") + elif config.base < 2: + logger.fatal("Error: 'base' cannot be less than 2") + elif config.base > 62: + logger.fatal("Error: 'base' cannot be greater than 62") + +def name(name, source): + return name + _rebase(config.base, time.time_ns()) + +def _rebase(base, number): + if number == 0: + return "0" + if base == 10: + return f"{number}" + rebased = "" + while number != 0: + rebased = _number_to_char(number % base) + rebased + number = int(number / base) + return rebased + +def _number_to_char(number): + if number < 10: + return chr(number + 48) + elif number < 36: + return chr(number + 87) + else: + return chr(number + 29) diff --git a/src/sshare/plugins/default/file.py b/src/sshare/plugins/default/file.py new file mode 100644 index 0000000..36bcc50 --- /dev/null +++ b/src/sshare/plugins/default/file.py @@ -0,0 +1,36 @@ +# This file is part of SSHare. +# +# SSHare is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, +# either version 3 of the License, or (at your option) any later version. +# +# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# SSHare. If not, see . + +from ..config import NoDefault +from ..config import Flags +from ..source import File + +plugin_type = "source" + +config = { + "file": NoDefault( + flags=Flags( + short="f", + long="file" + ) + ), +} + +def get(): + file = File(config.file) + if file.path.is_dir(): + logger.info(f"Uploading directory '{config.file}'") + else: + logger.info(f"Uploading file '{config.file}'") + return file diff --git a/src/sshare/plugins/default/ssh.py b/src/sshare/plugins/default/ssh.py new file mode 100644 index 0000000..b85819b --- /dev/null +++ b/src/sshare/plugins/default/ssh.py @@ -0,0 +1,64 @@ +# This file is part of SSHare. +# +# SSHare is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, +# either version 3 of the License, or (at your option) any later version. +# +# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# SSHare. If not, see . + +import getpass +import subprocess + +from ..config import Default +from ..config import NoDefault +from ..config import Flags +from ..source import File +from ..source import Raw + +plugin_type = "upload" + +config = { + "host": NoDefault(), + "path": NoDefault(), + "port": 22, + "user": getpass.getuser(), +} + +def upload(name, source): + logger.info(f"Uploading to {config.user}@{config.host}/{config.port}:{config.path}/{name}") + if isinstance(source, File): + command = [ + "scp", + ] + ([ + "-r", + ] if source.path.is_dir() else []) + [ + "-P", f"{config.port}", + source.path, + f"{config.user}@{config.host}:{config.path}/{name}", + ] + process = subprocess.run(command) + if process.returncode != 0: + if source.path.is_dir(): + logger.fatal("Error: failed to upload directory") + else: + logger.fatal("Error: failed to upload file") + elif isinstance(source, Raw): + command = [ + "ssh", + f"-p {config.port}", + f"{config.user}@{config.host}", + "-T", + f"cat - > {config.path}/{name}" + ] + process = subprocess.run( + command, + input = source.data, + ) + if process.returncode != 0: + logger.fatal("Error: failed to upload data") diff --git a/src/sshare/plugins/source.py b/src/sshare/plugins/source.py new file mode 100644 index 0000000..fcfc6a6 --- /dev/null +++ b/src/sshare/plugins/source.py @@ -0,0 +1,24 @@ +# This file is part of SSHare. +# +# SSHare is free software: you can redistribute it and/or modify it under the terms of +# the GNU General Public License as published by the Free Software Foundation, +# either version 3 of the License, or (at your option) any later version. +# +# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +# more details. +# +# You should have received a copy of the GNU General Public License along with +# SSHare. If not, see . + +from pathlib import Path + +class Raw: + def __init__(self, type, data): + self.type = type + self.data = data + +class File: + def __init__(self, path): + self.path = Path(path)