Compare commits

...

5 commits

Author SHA1 Message Date
0384f9080d
Arguments are working :fingers_crossed: 2024-08-31 17:29:46 +00:00
7b8fbf672b
Reorganised to use a plugin system 2024-08-31 05:43:30 +00:00
98c490dab9
Plugins are loading 2024-08-28 20:23:52 +00:00
72627995a7
Loaded a module :D 2024-08-28 16:56:05 +00:00
c1a455d7a4
Bump version 2024-08-28 06:43:21 +00:00
13 changed files with 599 additions and 205 deletions

1
hello.txt Normal file
View file

@ -0,0 +1 @@
Hello World!

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "sshare" name = "sshare"
version = "1.1.0" version = "2.0.0"
authors = [ authors = [
{ name = "Gnarwhal", email = "git.aspect893@passmail.net" }, { name = "Gnarwhal", email = "git.aspect893@passmail.net" },
] ]
@ -22,9 +22,9 @@ dependencies = [
[project.urls] [project.urls]
Homepage = "https://forge.monodon.me/Gnarwhal/sshare" Homepage = "https://forge.monodon.me/Gnarwhal/sshare"
Documentation= "https://forge.monodon.me/Gnarwhal/sshare/README.md#Usage" Documentation = "https://forge.monodon.me/Gnarwhal/sshare/README.md#Usage"
Repository = "https://forge.monodon.me/Gnarwhal/sshare" Repository = "https://forge.monodon.me/Gnarwhal/sshare"
Issues = "https://forge.monodon.me/Gnarwhal/sshare/issues" Issues = "https://forge.monodon.me/Gnarwhal/sshare/issues"
[project.scripts] [project.scripts]
sshare = "sshare.cli:main" sshare = "sshare.main:main"

View file

@ -12,6 +12,6 @@
# You should have received a copy of the GNU General Public License along with # You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>. # SSHare. If not, see <https://www.gnu.org/licenses/>.
from cli import main from main import main
main() main()

View file

@ -1,200 +0,0 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
import argparse
import getpass
import os
import os.path
import pyclip
import time
import tomllib
import subprocess
import sys
from sshare.version import version
class Config:
def __init__(self):
config_directory = os.environ.get("XDG_CONFIG_DIR")
if config_directory == None:
config_directory = f"{os.environ["HOME"]}/.config"
_config = self._load_from_file(f"{config_directory}/sshare/config.toml")
self.source_directory = _config.get("source_directory")
host = _config.get("host")
if host == None:
print("Error: 'host' cannot be 'None'")
sys.exit(1)
self.host_protocol = host.get("protocol")
self.host_name = host.get("name")
self.host_port = host.get("port")
self.host_path = host.get("path")
if self.host_protocol == None:
self.host_protocol = "https"
if self.host_name == None:
print("Error: 'host.name' cannot be 'None'")
sys.exit(1)
if self.host_port == None:
self.host_port = ""
else:
self.host_port = f":{self.host_port}"
if self.host_path == None:
self.host_path = ""
ssh = _config.get("ssh")
if ssh == None:
print("Error: 'ssh' cannot be 'None'")
sys.exit(1)
self.ssh_port = ssh.get("port")
self.ssh_user = ssh.get("user")
self.ssh_path = ssh.get("path")
if self.ssh_port == None:
self.ssh_port = 22
if self.ssh_user == None:
self.ssh_user = getpass.getuser()
if self.ssh_path == None:
print("Error: 'ssh.path' cannot be 'None'")
sys.exit(1)
def _load_from_file(self, config_path):
with open(config_path, mode="rb") as file:
return tomllib.load(file)
def rebase(number):
if number == 0:
return "0"
rebased = ""
while number != 0:
digit = number % 62
if digit < 10:
rebased = chr(digit + 48) + rebased
elif digit < 36:
rebased = chr(digit + 87) + rebased
else:
rebased = chr(digit + 29) + rebased
number = int(number / 62)
return rebased
def main():
arguments = parse_arguments()
config = Config()
contents = b''
target_file_extension = ""
if arguments.latest or arguments.file != None:
file_path = ""
if arguments.latest:
if config.source_directory == "":
print("Option 'latest' requires source directory to be specified")
sys.exit(1)
file_path = _latest(config.source_directory)
else:
file_path = arguments.file
print(f"Uploading file '{file_path}'")
with open(file_path, mode="rb") as file:
contents = file.read()
(_, target_file_extension) = os.path.splitext(file_path)
elif arguments.paste:
print("Uploading contents of clipboard")
contents = pyclip.paste()
target_file_extension = ".txt"
else:
print("Error: must specify one of -f FILE, -l, -p")
sys.exit(1)
target_id = rebase(time.time_ns())
target_file_name = f"{target_id}{target_file_extension}"
target_file = f"{config.ssh_path}/{target_file_name}"
target_destination = f"{config.ssh_user}@{config.host_name}"
print(f"Uploading to host: {target_destination}, port: {config.ssh_port}, file: {target_file}")
process = subprocess.run([
"ssh",
f"-p {config.ssh_port}",
target_destination,
"-T",
f"cat - > {target_file}"
],
input = contents,
)
if process.returncode != 0:
print("Error: failed to upload file")
sys.exit(1)
target_url = f"{config.host_protocol}://{config.host_name}{config.host_port}{config.host_path}/{target_file_name}"
print(f"File available at '{target_url}'")
if arguments.copy:
pyclip.copy(target_url)
print("URL copied to clipboard")
sys.exit(0)
def parse_arguments():
parser = argparse.ArgumentParser(
prog = "SSHare",
description = "Upload files to a server via ssh",
)
parser.add_argument(
"-v",
"--version",
action="version",
version=f"%(prog)s version {version}",
)
parser.add_argument(
"-l",
"--latest",
action="store_const",
const=True,
help="Upload the latest image from the source directory",
)
parser.add_argument(
"-p",
"--paste",
action="store_const",
const=True,
help="Upload the contents of the clipboard as a .txt file",
)
parser.add_argument(
"-f",
"--file",
help="Upload a file",
)
parser.add_argument(
"-c",
"--copy",
action="store_const",
const=True,
help="Copy the resultant URL to the clipboard",
)
arguments = parser.parse_args()
return arguments
def _latest(directory, key=os.path.getmtime):
files = map(lambda file: f"{directory}/{file}", os.listdir(directory))
selection = next(files)
selection_key = key(selection)
for file in files:
new_key = key(file)
if new_key > selection_key:
selection = file
selection_key = key
return selection

275
src/sshare/main.py Normal file
View file

@ -0,0 +1,275 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
import argparse
import getpass
import importlib
import importlib.util
import os
import os.path
import pyclip
import time
import tomllib
import subprocess
import sys
from pathlib import Path
from version import version
from plugins.config import NoDefault
class Congloggerate:
def __init__(self, loggers):
def info(message):
for logger in loggers:
logger.info(message)
def warn(message):
for logger in loggers:
logger.warn(message)
def error(message):
for logger in loggers:
logger.error(message)
self.info = info
self.warn = warn
self.error = error
fatalicize(self)
def fatalicize(logger):
def fatal(message):
logger.error(message)
sys.exit(1)
setattr(logger, "fatal", fatal)
class Plugin:
def __init__(self, name, module):
self.name = name
self.module = module
def main():
config_directory = Path(os.environ.get("XDG_CONFIG_DIR") or f"{os.environ["HOME"]}/.config") / "sshare"
with open(config_directory / "config.toml", mode="rb") as file:
config = tomllib.load(file)
# Load command line early and set it as the active logger
# so that it can be used to report errors while loading and
# configuring plugins
# i.e. before other logging plugins have had a chance to be initialised
logger = importlib.import_module("plugins.default.command_line")
fatalicize(logger)
# Load inbuilt plugins
plugins_flat = [
Plugin("command_line", logger),
Plugin("file", importlib.import_module("plugins.default.file")),
Plugin("current_time", importlib.import_module("plugins.default.current_time")),
Plugin("append_type", importlib.import_module("plugins.default.append_type")),
Plugin("ssh", importlib.import_module("plugins.default.ssh")),
]
plugins = {}
for type in [ "logger", "source", "name", "upload" ]:
plugins[type] = { "active": [], "inactive": [] }
# Load external plugins
sys.dont_write_bytecode = True
for plugin in (config_directory / "plugins").iterdir():
if plugin.is_file():
module_spec = importlib.util.spec_from_file_location(
plugin.stem,
plugin.as_posix(),
)
module = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(module)
plugins_flat.append(Plugin(plugin.stem, module))
sys.dont_write_bytecode = False
# Set plugin configurations from config file
# Load plugin arguments and detect conflicts
error = False
argument_map = {}
used_arguments = {}
parser = argparse.ArgumentParser(
prog = "SSHare",
description = "Upload files to a server via ssh",
)
parser.add_argument(
"-v",
"--version",
action="version",
version=f"%(prog)s version {version}",
)
if config.get("plugins") == None:
config["plugins"] = {}
for plugin in plugins_flat:
if hasattr(plugin.module, "config"):
plugin_config = config["plugins"].get(plugin.name)
if plugin_config != None:
for config_entry in plugin_config.items():
plugin.module.config[config_entry[0]] = config_entry[1]
else:
setattr(plugin.module, "config", {})
if hasattr(plugin.module, "args"):
for arg_name, arg in plugin.module.args.items():
if arg.is_valid():
arg.set_for_plugin(plugin)
def check_flag(flag):
if flag in used_arguments:
logger.error(f"Error: Argument '{arg_name}' for plugin '{plugin.name}' has conflict. Flag '{flag}' is also used by plugin '{used_arguments[arg.short]}'")
error = True
check_flag(arg.short)
check_flag(arg.long)
arg.add(parser, used_arguments)
argument_map[arg.dest] = plugin, arg_name
else:
logger.error(f"Error: Argument '{arg_name}' must set either one or both of short and long flag parameters")
error = True
if error:
sys.exit(1)
arguments = parser.parse_args()
for arg, (plugin, config) in list(argument_map.items()):
if getattr(arguments, arg):
plugin.module.config[config] = getattr(arguments, arg)
del argument_map[arg]
# Sort plugins by type and check activation criteria
error = False
for plugin in plugins_flat:
if isinstance(plugin.module.plugin_type, str):
plugin.module.plugin_type = [ plugin.module.plugin_type ]
for plugin_type in plugin.module.plugin_type:
plugins_of_type = plugins.get(plugin_type)
if plugins_of_type == None:
logger.error(f"Error: Plugin '{plugin.name}' has an invalid plugin type '{plugin_type}'")
error = True
else:
active = True
if hasattr(plugin.module, "activate"):
criteria = plugin.module.activate
if isinstance(plugin.module.activate, dict):
criteria = plugin.module.activate.get(plugin_type)
if criteria != None:
for criterion in criteria:
active = not plugin.module.args[criterion].dest in argument_map
if not active:
break
plugins_of_type["active" if active else "inactive"].append(plugin)
for plugin_type, plugins_of_type in plugins.items():
if len(plugins_of_type["active"]) == 0 and plugin_type != "logger":
if len(plugins_of_type["inactive"]) == 0:
logger.error(f"No '{plugin_type}' plugins available. Atleast one must be provided")
else:
logger.error(f"No '{plugin_type}' plugins activated. Activate at least one of:")
for plugin in plugins_of_type["inactive"]:
logger.error(f"{plugin.name}:")
criteria = plugin.module.activate
if isinstance(plugin.module.activate, dict):
criteria = plugin.module.activate[plugin_type]
for criterion in criteria:
logger.error(f" {plugin.module.args[criterion].pretty()}")
error = True
if error:
sys.exit(1)
# Flatten plugin configs
error = False
class PluginConfig: pass
for plugin in plugins_flat:
if hasattr(plugin.module, "config"):
config = plugin.module.config
plugin.module.config = PluginConfig()
for config_entry in config.items():
if config_entry[1] == NoDefault:
logger.error(f"Error: Value 'plugins.{plugin.name}.{config_entry[0]}' has no default value and must be specified explicitly")
error = True
else:
setattr(plugin.module.config, config_entry[0], config_entry[1])
if error:
sys.exit(1)
# Initialise plugins
for plugin in plugins_flat:
setattr(plugin.module, "logger", logger)
if hasattr(plugin.module, "init"):
error = error or plugin.module.init()
logger = Congloggerate([ logger.module for logger in plugins["logger"]["active"] ])
sources = []
for plugin in plugins["source"]["active"]:
sources.append(plugin.module.get())
if len(sources) == 0:
logger.error("Error: No sources provided. Must activate at least one source plugin")
log_activations(logger, plugins["source"])
for index, source in enumerate(sources):
name = ""
for plugin in plugins["name"]["active"]:
name = plugin.module.name(name, source)
sources[index] = name, source
for (name, source) in sources:
for plugin in plugins["upload"]["active"]:
plugin.module.upload(name, source)
sys.exit(0)
def parse_arguments():
parser = argparse.ArgumentParser(
prog = "SSHare",
description = "Upload files to a server via ssh",
)
parser.add_argument(
"-l",
"--latest",
action="store_const",
const=True,
help="Upload the latest image from the source directory",
)
parser.add_argument(
"-p",
"--paste",
action="store_const",
const=True,
help="Upload the contents of the clipboard as a .txt file",
)
parser.add_argument(
"-f",
"--file",
help="Upload a file",
)
parser.add_argument(
"-c",
"--copy",
action="store_const",
const=True,
help="Copy the resultant URL to the clipboard",
)
return arguments
def _latest(directory, key=os.path.getmtime):
files = map(lambda file: f"{directory}/{file}", os.listdir(directory))
selection = next(files)
selection_key = key(selection)
for file in files:
new_key = key(file)
if new_key > selection_key:
selection = file
selection_key = key
return selection

View file

@ -0,0 +1,74 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
class NoDefault: pass
class Argument:
def __init__(self,
short=None,
long=None,
action='store',
nargs=None,
const=None,
default=None,
type=str,
choices=None,
required=False,
help=None,
metavar=None):
self.short = short
self.long = long
self.action = action
self.nargs = nargs
self.const = const
self.default = default
self.type = type
self.choices = choices
self.help = help
self.metavar = metavar or self.long or self.short
def is_valid(self):
return (self.short != None and self.short != "") or (self.long != None and self.long != "")
def pretty(self):
if self.short and self.long:
pretty = f"-{self.short}, --{self.long}"
elif self.long:
pretty = f"--{self.long}"
else:
pretty = f"-{self.short}"
return pretty + f" {self.help}"
def set_for_plugin(self, plugin):
self.plugin = plugin
self.dest = f"{plugin.name}_{self.metavar}"
def add(self, parser, used_arguments):
parser.add_argument(
f"-{self.short}",
f"--{self.long}",
action=self.action,
nargs=self.nargs,
const=self.const,
default=self.default,
type=self.type,
choices=self.choices,
help=self.help,
metavar=self.metavar,
dest=self.dest,
)
if self.short:
used_arguments["short"] = self.plugin
if self.long:
used_arguments["long"] = self.plugin

View file

@ -0,0 +1,34 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
from ..source import File
from ..source import Raw
plugin_type = "name"
def name(name, source):
if isinstance(source, File):
if source.path.is_dir():
return name
else:
start = 1
components = source.path.name.split(".")
if components[0] == "":
start += 1
if start > len(components):
return name
else:
return name + "." + ".".join(components[start:])
elif isinstance(source, Raw):
return name + f".{source.type}"

View file

@ -0,0 +1,27 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
plugin_type = "logger"
def _print_with_color(color, message):
print(f"\033[{color}m{message}\033[0m")
def info(message):
_print_with_color(0, message)
def warn(message):
_print_with_color(93, message)
def error(message):
_print_with_color(91, message)

View file

@ -0,0 +1,61 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
import time
from ..config import Argument
from ..source import File
plugin_type = "name"
config = {
"base": 62,
}
args = {
"base": Argument(
short="b",
long="base",
help="Set the numeric base to use for the current time"
)
}
def init():
if not isinstance(config.base, int):
logger.fatal("Error: 'base' must be an integer")
elif config.base < 2:
logger.fatal("Error: 'base' cannot be less than 2")
elif config.base > 62:
logger.fatal("Error: 'base' cannot be greater than 62")
def name(name, source):
return name + _rebase(config.base, time.time_ns())
def _rebase(base, number):
if number == 0:
return "0"
if base == 10:
return f"{number}"
rebased = ""
while number != 0:
rebased = _number_to_char(number % base) + rebased
number = int(number / base)
return rebased
def _number_to_char(number):
if number < 10:
return chr(number + 48)
elif number < 36:
return chr(number + 87)
else:
return chr(number + 29)

View file

@ -0,0 +1,36 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
from ..config import Argument
from ..config import NoDefault
from ..source import File
plugin_type = "source"
activate = [ "file" ]
args = {
"file": Argument(
short="f",
long="file",
help="Upload a file"
)
}
def get():
file = File(config.file)
if file.path.is_dir():
logger.info(f"Uploading directory '{config.file}'")
else:
logger.info(f"Uploading file '{config.file}'")
return file

View file

@ -0,0 +1,62 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
import getpass
import subprocess
from ..config import NoDefault
from ..source import File
from ..source import Raw
plugin_type = "upload"
config = {
"host": NoDefault,
"path": NoDefault,
"port": 22,
"user": getpass.getuser(),
}
def upload(name, source):
logger.info(f"Uploading to {config.user}@{config.host}:{config.path}/{name} on port {config.port}")
if isinstance(source, File):
command = [
"scp",
] + ([
"-r",
] if source.path.is_dir() else []) + [
"-P", f"{config.port}",
source.path,
f"{config.user}@{config.host}:{config.path}/{name}",
]
process = subprocess.run(command)
if process.returncode != 0:
if source.path.is_dir():
logger.fatal("Error: failed to upload directory")
else:
logger.fatal("Error: failed to upload file")
elif isinstance(source, Raw):
command = [
"ssh",
f"-p {config.port}",
f"{config.user}@{config.host}",
"-T",
f"cat - > {config.path}/{name}"
]
process = subprocess.run(
command,
input = source.data,
)
if process.returncode != 0:
logger.fatal("Error: failed to upload data")

View file

@ -0,0 +1,24 @@
# This file is part of SSHare.
#
# SSHare is free software: you can redistribute it and/or modify it under the terms of
# the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
#
# SSHare is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# SSHare. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
class Raw:
def __init__(self, type, data):
self.type = type
self.data = data
class File:
def __init__(self, path):
self.path = Path(path)

View file

@ -1 +1 @@
version = "1.1.0" version = "2.0.0"