137 lines
4.5 KiB
Python
137 lines
4.5 KiB
Python
import requests
|
|
import os
|
|
import sys
|
|
import logging
|
|
from requests.auth import HTTPBasicAuth
|
|
import platform
|
|
from dotenv import load_dotenv
|
|
|
|
# Load .env file from script directory
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
load_dotenv(os.path.join(script_dir, '.env'))
|
|
|
|
# Setup logging with systemd journal support (falls back to stdout)
|
|
try:
|
|
from systemd.journal import JournalHandler
|
|
handler = JournalHandler(SYSLOG_IDENTIFIER='opnsense-cert-exporter')
|
|
except ImportError:
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
|
|
|
|
log = logging.getLogger('opnsense-cert-exporter')
|
|
log.addHandler(handler)
|
|
log.setLevel(logging.INFO)
|
|
|
|
|
|
def get_config():
|
|
"""Load configuration from environment variables."""
|
|
required = ['OPNSENSE_API_KEY', 'OPNSENSE_API_SECRET', 'OPNSENSE_HOST']
|
|
missing = [var for var in required if not os.environ.get(var)]
|
|
if missing:
|
|
log.error(f"Missing required environment variables: {', '.join(missing)}")
|
|
sys.exit(1)
|
|
|
|
return {
|
|
'api_key': os.environ['OPNSENSE_API_KEY'],
|
|
'api_secret': os.environ['OPNSENSE_API_SECRET'],
|
|
'host': os.environ['OPNSENSE_HOST'],
|
|
'output_directory': os.environ.get('OUTPUT_DIRECTORY', './certs'),
|
|
'verify_ssl': os.environ.get('VERIFY_SSL', 'true').lower() != 'false',
|
|
'file_owner': os.environ.get('FILE_OWNER'),
|
|
'file_group': os.environ.get('FILE_GROUP'),
|
|
'file_mode': os.environ.get('FILE_MODE'),
|
|
}
|
|
|
|
|
|
def search_certificates(config, search_phrase):
|
|
url = f"{config['host']}/api/trust/cert/search"
|
|
payload = {"searchPhrase": search_phrase}
|
|
response = requests.post(
|
|
url,
|
|
json=payload,
|
|
auth=HTTPBasicAuth(config['api_key'], config['api_secret']),
|
|
verify=config['verify_ssl']
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("rows", [])
|
|
|
|
|
|
def export_certificate(config, uuid, format):
|
|
url = f"{config['host']}/api/trust/cert/generate_file/{uuid}/{format}"
|
|
response = requests.post(
|
|
url,
|
|
auth=HTTPBasicAuth(config['api_key'], config['api_secret']),
|
|
verify=config['verify_ssl']
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("payload")
|
|
|
|
|
|
def save_certificate(config, cert_data, filename, output_dir):
|
|
"""Save certificate to file if content has changed. Returns True if file was updated."""
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
filepath = os.path.join(output_dir, filename)
|
|
|
|
# Check if content has changed
|
|
if os.path.exists(filepath):
|
|
with open(filepath, 'r') as f:
|
|
if f.read() == cert_data:
|
|
return False
|
|
|
|
with open(filepath, 'w') as f:
|
|
f.write(cert_data)
|
|
|
|
if platform.system().lower() == "linux":
|
|
try:
|
|
import pwd
|
|
import grp
|
|
|
|
if config['file_mode']:
|
|
os.chmod(filepath, int(config['file_mode'], 8))
|
|
|
|
if config['file_owner'] or config['file_group']:
|
|
uid = pwd.getpwnam(config['file_owner'] or pwd.getpwuid(os.getuid()).pw_name).pw_uid
|
|
gid = grp.getgrnam(config['file_group'] or grp.getgrgid(os.getgid()).gr_name).gr_gid
|
|
os.chown(filepath, uid, gid)
|
|
except Exception as e:
|
|
log.warning(f"Error setting permissions: {e}")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) != 2:
|
|
log.error("Usage: python main.py <certificate_name>")
|
|
sys.exit(1)
|
|
|
|
cert_name = sys.argv[1]
|
|
config = get_config()
|
|
|
|
### Search Certificates ###
|
|
certificates = search_certificates(config, cert_name)
|
|
if len(certificates) > 1:
|
|
log.error("Search returned more than one certificate. Please adjust your search to only return a single one.")
|
|
sys.exit(1)
|
|
if len(certificates) == 0:
|
|
log.error(f"No certificate found with search phrase: {cert_name}")
|
|
sys.exit(1)
|
|
|
|
certificate_uuid = certificates[0]['uuid']
|
|
output_dir = os.path.join(config['output_directory'], cert_name)
|
|
|
|
### Export certificate and private key ###
|
|
certificate_data = export_certificate(config, certificate_uuid, 'crt')
|
|
cert_updated = save_certificate(config, certificate_data, 'cert.pem', output_dir)
|
|
|
|
private_key_data = export_certificate(config, certificate_uuid, 'prv')
|
|
key_updated = save_certificate(config, private_key_data, 'privkey.pem', output_dir)
|
|
|
|
if cert_updated or key_updated:
|
|
log.info(f"Certificate updated: {output_dir}")
|
|
else:
|
|
log.info(f"Certificate unchanged: {output_dir}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|