Reorganised to use a plugin system

This commit is contained in:
Gnarwhal 2024-08-31 05:42:26 +00:00
parent 98c490dab9
commit 7b8fbf672b
Signed by: Gnarwhal
GPG key ID: 0989A73D8C421174
8 changed files with 337 additions and 128 deletions

View file

@ -26,69 +26,11 @@ import sys
from pathlib import Path from pathlib import Path
from version import version from version import version
class Config: from plugins.config import Default
def __init__(self, config_directory): from plugins.config import NoDefault
_config = self._load_from_file(config_directory / "config.toml") from plugins.config import Flags
self.source_directory = _config.get("source_directory") class Congloggerate:
host = _config.get("host")
if host == None:
print("Error: 'host' cannot be 'None'")
sys.exit(1)
self.host_protocol = host.get("protocol")
self.host_name = host.get("name")
self.host_port = host.get("port")
self.host_path = host.get("path")
if self.host_protocol == None:
self.host_protocol = "https"
if self.host_name == None:
print("Error: 'host.name' cannot be 'None'")
sys.exit(1)
if self.host_port == None:
self.host_port = ""
else:
self.host_port = f":{self.host_port}"
if self.host_path == None:
self.host_path = ""
ssh = _config.get("ssh")
if ssh == None:
print("Error: 'ssh' cannot be 'None'")
sys.exit(1)
self.ssh_port = ssh.get("port")
self.ssh_user = ssh.get("user")
self.ssh_path = ssh.get("path")
if self.ssh_port == None:
self.ssh_port = 22
if self.ssh_user == None:
self.ssh_user = getpass.getuser()
if self.ssh_path == None:
print("Error: 'ssh.path' cannot be 'None'")
sys.exit(1)
def _load_from_file(self, config_path):
with open(config_path, mode="rb") as file:
return tomllib.load(file)
def rebase(number):
if number == 0:
return "0"
rebased = ""
while number != 0:
digit = number % 62
if digit < 10:
rebased = chr(digit + 48) + rebased
elif digit < 36:
rebased = chr(digit + 87) + rebased
else:
rebased = chr(digit + 29) + rebased
number = int(number / 62)
return rebased
class MetaLogger:
def __init__(self, loggers): def __init__(self, loggers):
def info(message): def info(message):
for logger in loggers: for logger in loggers:
@ -106,6 +48,14 @@ class MetaLogger:
self.warn = warn self.warn = warn
self.error = error self.error = error
fatalicize(self)
def fatalicize(logger):
def fatal(message):
logger.error(message)
sys.exit(1)
setattr(logger, "fatal", fatal)
class Plugin: class Plugin:
def __init__(self, name, module): def __init__(self, name, module):
self.name = name self.name = name
@ -113,12 +63,32 @@ class Plugin:
def main(): def main():
config_directory = Path(os.environ.get("XDG_CONFIG_DIR") or f"{os.environ["HOME"]}/.config") / "sshare" config_directory = Path(os.environ.get("XDG_CONFIG_DIR") or f"{os.environ["HOME"]}/.config") / "sshare"
logger = importlib.import_module("command_line_logger") with open(config_directory / "config.toml", mode="rb") as file:
modules = { config = tomllib.load(file)
"logger": [ Plugin("command_line_logger", logger) ],
"data": [], # Load command line early and set it as the active logger
# so that it can be used to report errors while loading and
# configuring plugins
# i.e. before other logging plugins have had a chance to be initialised
logger = importlib.import_module("plugins.default.command_line")
fatalicize(logger)
# Load inbuilt plugins
plugins_flat = [
Plugin("command_line", logger),
Plugin("file", importlib.import_module("plugins.default.file")),
Plugin("current_time", importlib.import_module("plugins.default.current_time")),
Plugin("append_type", importlib.import_module("plugins.default.append_type")),
Plugin("ssh", importlib.import_module("plugins.default.ssh")),
]
plugins = {
"logger": [],
"source": [],
"name": [],
"upload": [],
} }
error = False
# Load external plugins
sys.dont_write_bytecode = True sys.dont_write_bytecode = True
for plugin in (config_directory / "plugins").iterdir(): for plugin in (config_directory / "plugins").iterdir():
if plugin.is_file(): if plugin.is_file():
@ -128,77 +98,76 @@ def main():
) )
module = importlib.util.module_from_spec(module_spec) module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(module) module_spec.loader.exec_module(module)
plugins_flat.append(Plugin(plugin.stem, module))
sys.dont_write_bytecode = False
modules_of_type = modules.get(module.plugin_type) # Set plugin configurations from config file
if modules_of_type == None: if config.get("plugins") == None:
logger.error(f"Error: Plugin '{plugin.stem}' has an invalid plugin type '{module.plugin_type}'") config["plugins"] = {}
error = True for plugin in plugins_flat:
else: if hasattr(plugin.module, "config"):
modules_of_type.append(Plugin(plugin.stem, module)) plugin_config = config["plugins"].get(plugin.name)
if hasattr(module, "init"): if plugin_config != None:
module.init() for config_entry in plugin_config.items():
plugin.module.config[config_entry[0]] = Default(config_entry[1])
# Flatten plugin configs
class PluginConfig: pass
error = False
for plugin in plugins_flat:
if hasattr(plugin.module, "config"):
config = plugin.module.config
plugin.module.config = PluginConfig()
for config_entry in config.items():
if isinstance(config_entry[1], NoDefault):
logger.error(f"{plugin.name} > Error: Value '{config_entry[0]}' has no default value and must be specified explicitly")
error = True
elif isinstance(config_entry[1], Default):
setattr(plugin.module.config, config_entry[0], config_entry[1].value)
else:
setattr(plugin.module.config, config_entry[0], config_entry[1])
if error: if error:
sys.exit(1) sys.exit(1)
sys.dont_write_bytecode = False
logger = MetaLogger([ logger.module for logger in modules["logger"] ])
logger.info("Successfully loaded plugins")
arguments = parse_arguments() # Initialise plugins
for plugin in plugins_flat:
setattr(plugin.module, "logger", logger)
if hasattr(plugin.module, "init"):
plugin.module.init()
config = Config(config_directory) # Sort plugins by type
error = False
contents = b'' for plugin in plugins_flat:
target_file_extension = "" if isinstance(plugin.module.plugin_type, str):
if arguments.latest or arguments.file != None: plugin.module.plugin_type = [ plugin.module.plugin_type ]
file_path = "" for plugin_type in plugin.module.plugin_type:
if arguments.latest: plugins_of_type = plugins.get(plugin_type)
if config.source_directory == "": if plugins_of_type == None:
print("Option 'latest' requires source directory to be specified") logger.error(f"Error: Plugin '{plugin.name}' has an invalid plugin type '{plugin_type}'")
sys.exit(1) error = True
file_path = _latest(config.source_directory) else:
else: plugins_of_type.append(plugin)
file_path = arguments.file if error:
print(f"Uploading file '{file_path}'")
with open(file_path, mode="rb") as file:
contents = file.read()
(_, target_file_extension) = os.path.splitext(file_path)
elif arguments.paste:
print("Uploading contents of clipboard")
contents = pyclip.paste()
target_file_extension = ".txt"
else:
print("Error: must specify one of -f FILE, -l, -p")
sys.exit(1)
target_id = rebase(time.time_ns())
target_file_name = f"{target_id}{target_file_extension}"
target_file = f"{config.ssh_path}/{target_file_name}"
target_destination = f"{config.ssh_user}@{config.host_name}"
print(f"Uploading to host: {target_destination}, port: {config.ssh_port}, file: {target_file}")
process = subprocess.run([
"ssh",
f"-p {config.ssh_port}",
target_destination,
"-T",
f"cat - > {target_file}"
],
input = contents,
)
if process.returncode != 0:
print("Error: failed to upload file")
sys.exit(1) sys.exit(1)
target_url = f"{config.host_protocol}://{config.host_name}{config.host_port}{config.host_path}/{target_file_name}" logger = Congloggerate([ logger.module for logger in plugins["logger"] ])
print(f"File available at '{target_url}'")
if arguments.copy: sources = []
pyclip.copy(target_url) for plugin in plugins["source"]:
print("URL copied to clipboard") sources.append(plugin.module.get())
for index, source in enumerate(sources):
name = ""
for plugin in plugins["name"]:
name = plugin.module.name(name, source)
sources[index] = name, source
for (name, source) in sources:
for plugin in plugins["upload"]:
plugin.module.upload(name, source)
sys.exit(0) sys.exit(0)
def parse_arguments(): def parse_arguments():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog = "SSHare", prog = "SSHare",

View file

@ -0,0 +1,27 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
class NoDefault:
def __init__(self, flags=None):
self.flags = flags
class Default:
def __init__(self, value, flags=None):
self.value = value
self.flags = flags
class Flags:
def __init__(self, short=None, long=None):
self.short = short
self.long = long

View file

@ -0,0 +1,34 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
from ..source import File
from ..source import Raw
plugin_type = "name"
def name(name, source):
if isinstance(source, File):
if source.path.is_dir():
return name
else:
start = 1
components = source.path.name.split(".")
if components[0] == "":
start += 1
if start > len(components):
return name
else:
return name + "." + ".".join(components[start:])
elif isinstance(source, Raw):
return name + f".{source.type}"

View file

@ -0,0 +1,55 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
import time
from ..config import Default
from ..config import Flags
from ..source import File
plugin_type = "name"
config = {
"base": 62,
}
def init():
if not isinstance(config.base, int):
logger.fatal("Error: 'base' must be an integer")
elif config.base < 2:
logger.fatal("Error: 'base' cannot be less than 2")
elif config.base > 62:
logger.fatal("Error: 'base' cannot be greater than 62")
def name(name, source):
return name + _rebase(config.base, time.time_ns())
def _rebase(base, number):
if number == 0:
return "0"
if base == 10:
return f"{number}"
rebased = ""
while number != 0:
rebased = _number_to_char(number % base) + rebased
number = int(number / base)
return rebased
def _number_to_char(number):
if number < 10:
return chr(number + 48)
elif number < 36:
return chr(number + 87)
else:
return chr(number + 29)

View file

@ -0,0 +1,36 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
from ..config import NoDefault
from ..config import Flags
from ..source import File
plugin_type = "source"
config = {
"file": NoDefault(
flags=Flags(
short="f",
long="file"
)
),
}
def get():
file = File(config.file)
if file.path.is_dir():
logger.info(f"Uploading directory '{config.file}'")
else:
logger.info(f"Uploading file '{config.file}'")
return file

View file

@ -0,0 +1,64 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
import getpass
import subprocess
from ..config import Default
from ..config import NoDefault
from ..config import Flags
from ..source import File
from ..source import Raw
plugin_type = "upload"
config = {
"host": NoDefault(),
"path": NoDefault(),
"port": 22,
"user": getpass.getuser(),
}
def upload(name, source):
logger.info(f"Uploading to {config.user}@{config.host}/{config.port}:{config.path}/{name}")
if isinstance(source, File):
command = [
"scp",
] + ([
"-r",
] if source.path.is_dir() else []) + [
"-P", f"{config.port}",
source.path,
f"{config.user}@{config.host}:{config.path}/{name}",
]
process = subprocess.run(command)
if process.returncode != 0:
if source.path.is_dir():
logger.fatal("Error: failed to upload directory")
else:
logger.fatal("Error: failed to upload file")
elif isinstance(source, Raw):
command = [
"ssh",
f"-p {config.port}",
f"{config.user}@{config.host}",
"-T",
f"cat - > {config.path}/{name}"
]
process = subprocess.run(
command,
input = source.data,
)
if process.returncode != 0:
logger.fatal("Error: failed to upload data")

View file

@ -0,0 +1,24 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
class Raw:
def __init__(self, type, data):
self.type = type
self.data = data
class File:
def __init__(self, path):
self.path = Path(path)