import requests import os import sys import logging from requests.auth import HTTPBasicAuth import platform from dotenv import load_dotenv # Load .env file from script directory script_dir = os.path.dirname(os.path.abspath(__file__)) load_dotenv(os.path.join(script_dir, '.env')) # Setup logging with systemd journal support (falls back to stdout) try: from systemd.journal import JournalHandler handler = JournalHandler(SYSLOG_IDENTIFIER='opnsense-cert-exporter') except ImportError: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) log = logging.getLogger('opnsense-cert-exporter') log.addHandler(handler) log.setLevel(logging.INFO) def get_config(): """Load configuration from environment variables.""" required = ['OPNSENSE_API_KEY', 'OPNSENSE_API_SECRET', 'OPNSENSE_HOST'] missing = [var for var in required if not os.environ.get(var)] if missing: log.error(f"Missing required environment variables: {', '.join(missing)}") sys.exit(1) return { 'api_key': os.environ['OPNSENSE_API_KEY'], 'api_secret': os.environ['OPNSENSE_API_SECRET'], 'host': os.environ['OPNSENSE_HOST'], 'output_directory': os.environ.get('OUTPUT_DIRECTORY', './certs'), 'verify_ssl': os.environ.get('VERIFY_SSL', 'true').lower() != 'false', 'file_owner': os.environ.get('FILE_OWNER'), 'file_group': os.environ.get('FILE_GROUP'), 'file_mode': os.environ.get('FILE_MODE'), } def search_certificates(config, search_phrase): url = f"{config['host']}/api/trust/cert/search" payload = {"searchPhrase": search_phrase} response = requests.post( url, json=payload, auth=HTTPBasicAuth(config['api_key'], config['api_secret']), verify=config['verify_ssl'] ) response.raise_for_status() return response.json().get("rows", []) def export_certificate(config, uuid, format): url = f"{config['host']}/api/trust/cert/generate_file/{uuid}/{format}" response = requests.post( url, auth=HTTPBasicAuth(config['api_key'], config['api_secret']), verify=config['verify_ssl'] ) response.raise_for_status() return response.json().get("payload") def save_certificate(config, cert_data, filename, output_dir): """Save certificate to file if content has changed. Returns True if file was updated.""" os.makedirs(output_dir, exist_ok=True) filepath = os.path.join(output_dir, filename) # Check if content has changed if os.path.exists(filepath): with open(filepath, 'r') as f: if f.read() == cert_data: return False with open(filepath, 'w') as f: f.write(cert_data) if platform.system().lower() == "linux": try: import pwd import grp if config['file_mode']: os.chmod(filepath, int(config['file_mode'], 8)) if config['file_owner'] or config['file_group']: uid = pwd.getpwnam(config['file_owner'] or pwd.getpwuid(os.getuid()).pw_name).pw_uid gid = grp.getgrnam(config['file_group'] or grp.getgrgid(os.getgid()).gr_name).gr_gid os.chown(filepath, uid, gid) except Exception as e: log.warning(f"Error setting permissions: {e}") return True def main(): if len(sys.argv) != 2: log.error("Usage: python main.py ") sys.exit(1) cert_name = sys.argv[1] config = get_config() ### Search Certificates ### certificates = search_certificates(config, cert_name) if len(certificates) > 1: log.error("Search returned more than one certificate. Please adjust your search to only return a single one.") sys.exit(1) if len(certificates) == 0: log.error(f"No certificate found with search phrase: {cert_name}") sys.exit(1) certificate_uuid = certificates[0]['uuid'] output_dir = os.path.join(config['output_directory'], cert_name) ### Export certificate and private key ### certificate_data = export_certificate(config, certificate_uuid, 'crt') cert_updated = save_certificate(config, certificate_data, 'cert.pem', output_dir) private_key_data = export_certificate(config, certificate_uuid, 'prv') key_updated = save_certificate(config, private_key_data, 'privkey.pem', output_dir) if cert_updated or key_updated: log.info(f"Certificate updated: {output_dir}") else: log.info(f"Certificate unchanged: {output_dir}") if __name__ == "__main__": main()