#!/usr/bin/env python # Copyright (c) 2024 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import ipaddress # NOTES/TODO(s): # 1. multilevel spec_keys validation # 2. Allow count != 0 for daemons ALLOWED_DAEMONS = ['host', 'mon', 'mgr', 'mds', 'nfs', 'osd', 'rgw', 'grafana', 'crash', 'prometheus', 'alertmanager', 'node-exporter', 'ingress'] ALLOWED_HOST_PLACEMENT_MODE = ['hosts', 'host_pattern', 'label', 'count'] CRUSH_ALLOWED_LOCATION = ['osd', 'host', 'chassis', 'rack', 'row', 'pdu', 'pod', 'room', 'datacenter', 'zone', 'region', 'root'] ALLOWED_EXTRA_KEYS = dict() ALLOWED_SPEC_KEYS = { 'rgw': [ 'rgw_frontend_port', 'rgw_frontend_type', 'rgw_realm', 'rgw_zone', 'rgw_ip_address', 'rgw_frontend_ssl_certificate', 'ssl' ], 'nfs': [ 'namespace', 'pool' ], 'ingress': [ 'backend_service', 'frontend_port', 'monitor_port', 'virtual_ip', 'virtual_ips_list', 'virtual_interface_networks', 'ssl_cert' ], } class CephPlacementSpec(object): def __init__(self, hosts: list, host_pattern: str, count: int, label: str): if len(label) > 0: self.label = label if host_pattern is not None and len(host_pattern) > 0: self.host_pattern = host_pattern if hosts is not None and len(hosts) > 0: self.hosts = hosts self.count = count def __setattr__(self, key, value): self.__dict__[key] = value def make_spec(self): # if the host list is passed, this should be # the preferred way if getattr(self, 'hosts', None): spec_template = { 'placement': { 'hosts': self.hosts } } # if no list is passed or an empty list is provided # let's check if a "host pattern" is provided elif getattr(self, 'host_pattern', None): spec_template = { 'placement': { 'host_pattern': self.host_pattern } } elif getattr(self, 'label', None) is not None: spec_template = { 'placement': { 'label': self.label } } else: spec_template = {} if self.count > 0: spec_template.get('placement')['count'] = self.count return spec_template class CephHostSpec(object): def __init__(self, daemon_type: str, daemon_addr: str, daemon_hostname: str, labels: list, location: dict = None, ): self.daemon_type = daemon_type self.daemon_addr = daemon_addr self.daemon_hostname = daemon_hostname assert isinstance(labels, list) self.labels = list(set(labels)) # init crush location parameters if location and isinstance(location, dict): self.location = location else: self.location = {} def is_valid_crush_location(self): for k in self.location.keys(): if k not in CRUSH_ALLOWED_LOCATION: return False return True def make_daemon_spec(self): lb = {} crloc = {} spec_template = { 'service_type': self.daemon_type, 'addr': self.daemon_addr, 'hostname': self.daemon_hostname, } if len(self.labels) > 0: lb = {'labels': self.labels} if self.location: if self.is_valid_crush_location(): crloc = {'location': self.location} else: raise Exception("Fatal: the spec should be composed by only allowed keywords") spec_template = {**spec_template, **lb, **crloc} return spec_template class CephDaemonSpec(object): def __init__(self, daemon_type: str, daemon_id: str, daemon_name: str, hosts: list, placement_pattern: str, networks: list, spec: dict, label: str, count: int, unmanaged: bool, **kwargs: dict): self.daemon_name = daemon_name self.daemon_id = daemon_id self.daemon_type = daemon_type self.hosts = hosts self.placement = placement_pattern self.label = label self.count = count self.unmanaged = unmanaged # network list where the current daemon should be bound if not networks: self.networks = [] else: self.networks = networks # extra keywords definition (e.g. data_devices for OSD(s) self.extra = {} for k, v in kwargs.items(): self.extra[k] = v assert isinstance(spec, dict) self.spec = spec def __setattr__(self, key, value): self.__dict__[key] = value def validate_networks(self): if len(self.networks) < 1: return False for network in self.networks: try: ipaddress.ip_network(network) except ValueError as e: raise Exception(f'Cannot parse network {network}: {e}') return True def make_daemon_spec(self): # the placement dict pl = {} # the spec dict sp = {} place = CephPlacementSpec(self.hosts, self.placement, self.count, self.label) pl = place.make_spec() # the spec daemon header spec_template = { 'service_type': self.daemon_type, 'service_name': self.daemon_name, 'service_id': self.daemon_id, } # the networks dict ntw = {} if self.validate_networks(): ntw = { 'networks': self.networks } # process extra parameters if present if not self.validate_keys(self.extra.keys(), ALLOWED_EXTRA_KEYS): raise Exception("Fatal: the spec should be composed by only \ allowed keywords") # append the spec if provided if len(self.spec.keys()) > 0: if self.validate_keys(self.spec.keys(), ALLOWED_SPEC_KEYS): sp = {'spec': self.normalize_spec(self.filter_spec(self.spec))} else: raise Exception("Fatal: the spec should be composed by only \ allowed keywords") unmgd = { 'unmanaged': self.unmanaged, } # build the resulting daemon template spec_template = {**spec_template, **ntw, **unmgd, **self.extra, **pl, **sp} return spec_template def normalize_spec(self, spec): ''' For each spec key we need to make sure that ports are evaluated as int, otherwise cephadm fails when the spec is applied. ''' for k, v in spec.items(): if 'port' in k: spec[k] = int(v) return spec def filter_spec(self, spec): return {k: v for k, v in spec.items() if v} def validate_keys(self, spec, ALLOWED_KEYS): ''' When the spec section is created, if constraints are defined for a given daemon, then this check is run to make sure only valid keys are provided. ''' # an entry for the current daemon is not found # no checks are required (let ceph orch take care of # the validation if self.daemon_type not in ALLOWED_KEYS.keys(): return True # a basic check on the spec dict: if some constraints # are specified, the provided keys should be contained # in the ALLOWED keys for item in spec: if item not in ALLOWED_KEYS.get(self.daemon_type): return False return True def log(self, msg): print('[DEBUG] - %s' % msg) def whoami(self) -> str: return '%s.%s' % (self.daemon_type, self.daemon_id) def export(content, fp): if len(content) > 0: if fp is not None and len(fp) > 0: open(fp, 'w').close() # reset file with open(fp, 'w') as f: f.write('---\n') f.write(content) else: print('---') print(content.rstrip('\r\n')) else: print('Nothing to dump!')