import sys import subprocess import itertools import time import os from pprint import pprint import argparse from pathlib import Path from typing import NamedTuple from dataclasses import dataclass class Filenames(NamedTuple): ssl_config: Path cfg_tmpl: Path ca: Path req: Path key: Path cert: Path config: Path @classmethod def for_client(cls, name: str): return cls( Path(os.environ.get("KEY_CONFIG", "openssl-1.0.0.cnf")), Path("template.ovpn"), Path("keys") / "ca.crt", Path("keys") / f"{name}.csr", Path("keys") / f"{name}.key", Path("keys") / f"{name}.crt", Path("config") / f"{name}.ovpn" ) def is_config_exists(self): return self.config.exists() def is_cert_exists(self): return self.cert.exists() @dataclass class CertFiles: _encoding = "oem" _req_days = "3650" ssl_config_filename: str cfg_tmpl_filename: str ca_filename: str req_filename: str key_filename: str cert_filename: str config_filename: str @classmethod def from_files(cls, files: Filenames): return cls( str(files.ssl_config), str(files.cfg_tmpl), str(files.ca), str(files.req), str(files.key), str(files.cert), str(files.config) ) @classmethod def _readfile(cls, filename) -> str: result = "" if Path(filename).exists(): with open(filename, encoding=cls._encoding, mode="r") as f: result = f.read() return result @classmethod def _writefile(cls, filename, text) -> str: with open(filename, encoding=cls._encoding, mode="w") as f: f.write(text) @property def cfg_tmpl(self): return self._readfile(self.cfg_tmpl_filename) @property def ca(self): return self._readfile(self.ca_filename) @property def req(self): return self._readfile(self.req_filename) @property def key(self): return self._readfile(self.key_filename) @property def cert(self): return self._readfile(self.cert_filename) @property def config(self): return self._readfile(self.config_filename) @config.setter def config(self, value): self._writefile(self.config_filename, value) def request(self): if self.req: return print("request", end="... ") cmd = f"openssl req -days {self._req_days} -nodes -new -keyout {self.key_filename} -out {self.req_filename} -config {self.ssl_config_filename} -batch" subprocess.run(cmd, check=True) def sign(self): if self.cert: return print("sign", end="... ") cmd = f"openssl ca -days {self._req_days} -out {self.cert_filename} -in {self.req_filename} -config {self.ssl_config_filename} -batch" subprocess.run(cmd, check=True) def render_client_config(self): self.config = self.cfg_tmpl.replace("{{ca}}", self.ca.rstrip("\n")).replace("{{cert}}", self.cert.rstrip("\n")).replace("{{key}}", self.key.rstrip("\n")) def make_validator(valid_keys=None): valid = valid_keys or [] all_keys_is_valid = len(valid) == 0 valid = set(valid) def validate_pair(ob): try: if not (len(ob) == 2): print("Unexpected result:", ob, file=sys.stderr) raise ValueError except: return False return all_keys_is_valid or ob[0] in valid return validate_pair def consume(iter): try: while True: next(iter) except StopIteration: pass def get_environment_from_batch_command(env_cmd, initial=None, /, valid_keys=None, encoding = "oem"): """ Take a command (either a single command or list of arguments) and return the environment created after running that command. Note that if the command must be a batch file or .cmd file, or the changes to the environment will not be captured. If initial is supplied, it is used as the initial environment passed to the child process. """ if not isinstance(env_cmd, (list, tuple)): env_cmd = [env_cmd] # construct the command that will alter the environment env_cmd = subprocess.list2cmdline(env_cmd) # create a tag so we can tell in the output when the proc is done tag = 'Done running command' # construct a cmd.exe command to do accomplish this cmd = 'cmd.exe /s /c "{env_cmd} && echo "{tag}" && set"'.format(**vars()) # launch the process proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=initial) # parse the output sent to stdout lines = proc.stdout # consume whatever output occurs until the tag is reached consume(itertools.takewhile(lambda l: tag not in l.decode(encoding), lines)) # define a way to handle each KEY=VALUE line handle_line = lambda l: tuple(map(lambda x: x.strip(), l.decode(encoding).rstrip().split('=',1))) # parse key/values into pairs pairs = map(handle_line, lines) # make sure the pairs are valid valid_pairs = filter(make_validator(valid_keys), pairs) # construct a dictionary of the pairs result = dict(valid_pairs) # let the process finish proc.communicate() return result def load_vars(): valid_keys = "HOME,KEY_CONFIG,KEY_DIR,DH_KEY_SIZE,KEY_SIZE,KEY_COUNTRY,KEY_PROVINCE,KEY_CITY,KEY_ORG,KEY_EMAIL,KEY_OU,PKCS11_MODULE_PATH,PKCS11_PIN".split(",") envs = get_environment_from_batch_command("vars.bat", valid_keys=valid_keys) os.environ.update(envs) print("loaded from vars:") pprint(envs) print("---\n\n") def set_name_vars(name: str): os.environ.update({ "KEY_CN": name, "KEY_NAME": name }) def parse_args(): parser = argparse.ArgumentParser( prog='make_client', description='Generate key and make client config', epilog='. . .') parser.add_argument('client_name') parser.add_argument('-e', '--email') return parser.parse_args() def main(): args = parse_args() load_vars() set_name_vars(args.client_name) filenames = Filenames.for_client(args.client_name) if filenames.is_config_exists(): print(f"file already exists: {filenames.config}", file=sys.stderr) return if args.email: os.environ["KEY_EMAIL"] = args.email print("build client:", args.client_name) certs = CertFiles.from_files(filenames) if not filenames.is_cert_exists(): print("make", end="... ") certs.request() certs.sign() print("ok") print("render", end="...") certs.render_client_config() print("done") if __name__ == "__main__": main()