136 lines
4.7 KiB
Python
136 lines
4.7 KiB
Python
|
import logging
|
||
|
import os
|
||
|
from typing import Optional, List, Dict
|
||
|
|
||
|
from pathvalidate import is_valid_filepath
|
||
|
|
||
|
from webpub1c.webpub.apache_config import ApacheConfig
|
||
|
from webpub1c.webpub.common import VRDConfig, urlpath_join
|
||
|
from webpub1c.webpub.webpublication import WebPublication
|
||
|
|
||
|
|
||
|
def infobase_data_blank(name: str):
|
||
|
return {
|
||
|
'name': name,
|
||
|
'publicated': False,
|
||
|
'url': '',
|
||
|
'directory': '',
|
||
|
'vrd_filename': '',
|
||
|
'infobase_filepath': '',
|
||
|
'is_file_infobase': False,
|
||
|
}
|
||
|
|
||
|
|
||
|
def publication_data(publication: WebPublication):
|
||
|
return {
|
||
|
'name': publication.name,
|
||
|
'publicated': True,
|
||
|
'url': publication.url_path,
|
||
|
'directory': publication.directory,
|
||
|
'vrd_filename': publication.vrd_filename,
|
||
|
'infobase_filepath': publication.infobase_filepath,
|
||
|
'is_file_infobase': publication.is_file_infobase(),
|
||
|
}
|
||
|
|
||
|
|
||
|
class PublicationManager:
|
||
|
def __init__(self, config, verbose: bool = False):
|
||
|
level = logging.INFO if verbose else logging.WARNING
|
||
|
logging.basicConfig(level=level)
|
||
|
self._log = logging.getLogger("manager")
|
||
|
self._log.setLevel(level)
|
||
|
|
||
|
self._config = config
|
||
|
|
||
|
vrd_params: Optional[VRDConfig] = self._config.get('vrd_params', None)
|
||
|
apache_config: str = self._config.get('apache_config', '')
|
||
|
self._vrd_path: str = self._config.get('vrd_path', '')
|
||
|
self._dir_path: str = self._config.get('dir_path', '')
|
||
|
self._url_base: str = self._config.get('url_base', '')
|
||
|
|
||
|
self._apache_cfg = ApacheConfig(apache_config, self._vrd_path,
|
||
|
self._dir_path, self._url_base,
|
||
|
vrd_params)
|
||
|
|
||
|
def _is_vrd_path_valid(self) -> bool:
|
||
|
return os.path.isdir(self._vrd_path)
|
||
|
|
||
|
def _is_dir_path_valid(self) -> bool:
|
||
|
return os.path.isdir(self._dir_path)
|
||
|
|
||
|
def _is_url_base_valid(self) -> bool:
|
||
|
return is_valid_filepath(self._url_base, platform='posix') and self._url_base.startswith('/')
|
||
|
|
||
|
def _is_module_valid(self) -> bool:
|
||
|
if 'platform_path' not in self._config:
|
||
|
return False
|
||
|
if 'ws_module' not in self._config:
|
||
|
return False
|
||
|
return os.path.isfile(os.path.join(self._config['platform_path'], self._config['ws_module']))
|
||
|
|
||
|
def check(self):
|
||
|
return {
|
||
|
'is_apache_cfg_valid': self._apache_cfg.is_valid(),
|
||
|
'is_vrd_path_valid': self._is_vrd_path_valid(),
|
||
|
'is_dir_path_valid': self._is_dir_path_valid(),
|
||
|
'is_url_base_valid': self._is_url_base_valid(),
|
||
|
'is_module_valid': self._is_module_valid()
|
||
|
}
|
||
|
|
||
|
def has_module(self):
|
||
|
return self._apache_cfg.has_1cws_module()
|
||
|
|
||
|
def add_module(self):
|
||
|
""" Add 1cws module to apache config """
|
||
|
|
||
|
if self._apache_cfg.has_1cws_module():
|
||
|
self._log.info('config unchanged')
|
||
|
else:
|
||
|
module: str = os.path.join(self._config['platform_path'], self._config['ws_module'])
|
||
|
self._apache_cfg.add_1cws_module(module)
|
||
|
self._log.info('module added')
|
||
|
|
||
|
def list(self) -> List[str]:
|
||
|
""" List publication names """
|
||
|
return list(self._apache_cfg.publications)
|
||
|
|
||
|
def publications(self) -> List[Dict[str, str]]:
|
||
|
""" List of publications """
|
||
|
|
||
|
pubs = map(lambda p: publication_data(p), self._apache_cfg.iter())
|
||
|
return list(pubs)
|
||
|
|
||
|
def get(self, ibname: str):
|
||
|
""" Get publication info """
|
||
|
|
||
|
publication = self._apache_cfg.get_publication(ibname)
|
||
|
if publication is None:
|
||
|
return publication
|
||
|
|
||
|
return publication_data(publication)
|
||
|
|
||
|
def add(self, ibname: str, url: Optional[str] = None, file: str = '', force: bool = False) -> str:
|
||
|
""" Add new publication """
|
||
|
|
||
|
publication = self._apache_cfg.create_publication(ibname, url, file, force)
|
||
|
self._apache_cfg.add_publication(publication, force)
|
||
|
self._log.info(f'publication added: {ibname}')
|
||
|
return publication.url_path
|
||
|
|
||
|
def set_url(self, ibname: str, url: str) -> None:
|
||
|
""" Set publication url """
|
||
|
|
||
|
publication = self._apache_cfg.get_publication(ibname)
|
||
|
if publication is None:
|
||
|
raise KeyError(f'infobase "{ibname}" not publicated')
|
||
|
|
||
|
publication.url_path = urlpath_join(self._url_base, url)
|
||
|
self._apache_cfg.remove_publication(publication.name, destroy=False)
|
||
|
self._apache_cfg.add_publication(publication)
|
||
|
self._log.info(f'publication changed: {ibname}')
|
||
|
|
||
|
def remove(self, ibname: str, force: bool = False):
|
||
|
""" Remove publication """
|
||
|
|
||
|
self._apache_cfg.remove_publication(ibname, force=force)
|
||
|
self._log.info(f'publication removed: {ibname}')
|