This commit is contained in:
2025-12-20 06:12:03 +01:00
parent 2cc0bf8f8e
commit 19d3ba4fa3
10 changed files with 513 additions and 93 deletions

126
main.py Normal file
View File

@@ -0,0 +1,126 @@
import requests
import os
import sys
import logging
from requests.auth import HTTPBasicAuth
import platform
from dotenv import load_dotenv
# Load .env file from script directory
script_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(os.path.join(script_dir, '.env'))
# Setup logging with systemd journal support (falls back to stdout)
try:
from systemd.journal import JournalHandler
handler = JournalHandler(SYSLOG_IDENTIFIER='opnsense-cert-exporter')
except ImportError:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
log = logging.getLogger('opnsense-cert-exporter')
log.addHandler(handler)
log.setLevel(logging.INFO)
def get_config():
"""Load configuration from environment variables."""
required = ['OPNSENSE_API_KEY', 'OPNSENSE_API_SECRET', 'OPNSENSE_HOST']
missing = [var for var in required if not os.environ.get(var)]
if missing:
log.error(f"Missing required environment variables: {', '.join(missing)}")
sys.exit(1)
return {
'api_key': os.environ['OPNSENSE_API_KEY'],
'api_secret': os.environ['OPNSENSE_API_SECRET'],
'host': os.environ['OPNSENSE_HOST'],
'output_directory': os.environ.get('OUTPUT_DIRECTORY', './certs'),
'file_owner': os.environ.get('FILE_OWNER'),
'file_group': os.environ.get('FILE_GROUP'),
'file_mode': os.environ.get('FILE_MODE'),
}
def search_certificates(config, search_phrase):
url = f"{config['host']}/api/trust/cert/search"
payload = {"searchPhrase": search_phrase}
response = requests.post(url, json=payload, auth=HTTPBasicAuth(config['api_key'], config['api_secret']))
response.raise_for_status()
return response.json().get("rows", [])
def export_certificate(config, uuid, format):
url = f"{config['host']}/api/trust/cert/generate_file/{uuid}/{format}"
response = requests.post(url, auth=HTTPBasicAuth(config['api_key'], config['api_secret']))
response.raise_for_status()
return response.json().get("payload")
def save_certificate(config, cert_data, filename, output_dir):
"""Save certificate to file if content has changed. Returns True if file was updated."""
os.makedirs(output_dir, exist_ok=True)
filepath = os.path.join(output_dir, filename)
# Check if content has changed
if os.path.exists(filepath):
with open(filepath, 'r') as f:
if f.read() == cert_data:
return False
with open(filepath, 'w') as f:
f.write(cert_data)
if platform.system().lower() == "linux":
try:
import pwd
import grp
if config['file_mode']:
os.chmod(filepath, int(config['file_mode'], 8))
if config['file_owner'] or config['file_group']:
uid = pwd.getpwnam(config['file_owner'] or pwd.getpwuid(os.getuid()).pw_name).pw_uid
gid = grp.getgrnam(config['file_group'] or grp.getgrgid(os.getgid()).gr_name).gr_gid
os.chown(filepath, uid, gid)
except Exception as e:
log.warning(f"Error setting permissions: {e}")
return True
def main():
if len(sys.argv) != 2:
log.error("Usage: python main.py <certificate_name>")
sys.exit(1)
cert_name = sys.argv[1]
config = get_config()
### Search Certificates ###
certificates = search_certificates(config, cert_name)
if len(certificates) > 1:
log.error("Search returned more than one certificate. Please adjust your search to only return a single one.")
sys.exit(1)
if len(certificates) == 0:
log.error(f"No certificate found with search phrase: {cert_name}")
sys.exit(1)
certificate_uuid = certificates[0]['uuid']
output_dir = os.path.join(config['output_directory'], cert_name)
### Export certificate and private key ###
certificate_data = export_certificate(config, certificate_uuid, 'crt')
cert_updated = save_certificate(config, certificate_data, 'cert.pem', output_dir)
private_key_data = export_certificate(config, certificate_uuid, 'prv')
key_updated = save_certificate(config, private_key_data, 'privkey.pem', output_dir)
if cert_updated or key_updated:
log.info(f"Certificate updated: {output_dir}")
else:
log.info(f"Certificate unchanged: {output_dir}")
if __name__ == "__main__":
main()