File manager - Edit - /var/www/payraty/helpdesk/public/storage/branding_media/images/netplan.zip
Back
PK ! �u� � 50-cloud-init.yamlnu �[��� # This file is generated from information provided by the datasource. Changes # to it will not persist across an instance reboot. To disable cloud-init's # network configuration capabilities, write a file # /etc/cloud/cloud.cfg.d/99-disable-network-config.cfg with the following: # network: {config: disabled} network: ethernets: eth0: dhcp4: true dhcp6: false match: macaddress: 06:3b:6d:c0:fb:8f set-name: eth0 version: 2 PK ! �G�� � netplan/configmanager.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2018 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. '''netplan configuration manager''' import logging import os import shutil import sys import tempfile from typing import Optional from netplan import libnetplan class ConfigManager(object): def __init__(self, prefix="/", extra_files={}): self.prefix = prefix self.tempdir = tempfile.mkdtemp(prefix='netplan_') self.temp_etc = os.path.join(self.tempdir, "etc") self.temp_run = os.path.join(self.tempdir, "run") self.extra_files = extra_files self.new_interfaces = set() self.np_state: Optional[libnetplan.State] = None def __getattr__(self, attr): assert self.np_state is not None, "Must call parse() before accessing the config." return getattr(self.np_state, attr) @property def physical_interfaces(self): assert self.np_state is not None, "Must call parse() before accessing the config." interfaces = {} interfaces.update(self.np_state.ethernets) interfaces.update(self.np_state.modems) interfaces.update(self.np_state.wifis) return interfaces @property def virtual_interfaces(self): assert self.np_state is not None, "Must call parse() before accessing the config." interfaces = {} # what about ovs_ports? interfaces.update(self.np_state.bridges) interfaces.update(self.np_state.bonds) interfaces.update(self.np_state.tunnels) interfaces.update(self.np_state.vlans) interfaces.update(self.np_state.vrfs) return interfaces def parse(self, extra_config=None): """ Parse all our config files to return an object that describes the system's entire configuration, so that it can later be interrogated. Returns a libnetplan State wrapper """ # /run/netplan shadows /etc/netplan/, which shadows /lib/netplan parser = libnetplan.Parser() try: parser.load_yaml_hierarchy(rootdir=self.prefix) if extra_config: for f in extra_config: parser.load_yaml(f) self.np_state = libnetplan.State() self.np_state.import_parser_results(parser) except libnetplan.LibNetplanException as e: raise ConfigurationError(*e.args) self.np_state.dump_to_logs() return self.np_state def add(self, config_dict): for config_file in config_dict: self._copy_file(config_file, config_dict[config_file]) self.extra_files.update(config_dict) # Invalidate the current parsed state self.np_state = None def backup(self, backup_config_dir=True): if backup_config_dir: self._copy_tree(os.path.join(self.prefix, "etc/netplan"), os.path.join(self.temp_etc, "netplan")) self._copy_tree(os.path.join(self.prefix, "run/NetworkManager/system-connections"), os.path.join(self.temp_run, "NetworkManager", "system-connections"), missing_ok=True) self._copy_tree(os.path.join(self.prefix, "run/systemd/network"), os.path.join(self.temp_run, "systemd", "network"), missing_ok=True) def revert(self): try: for extra_file in dict(self.extra_files): os.unlink(self.extra_files[extra_file]) del self.extra_files[extra_file] temp_nm_path = "{}/NetworkManager/system-connections".format(self.temp_run) temp_networkd_path = "{}/systemd/network".format(self.temp_run) if os.path.exists(temp_nm_path): shutil.rmtree(os.path.join(self.prefix, "run/NetworkManager/system-connections")) self._copy_tree(temp_nm_path, os.path.join(self.prefix, "run/NetworkManager/system-connections")) if os.path.exists(temp_networkd_path): shutil.rmtree(os.path.join(self.prefix, "run/systemd/network")) self._copy_tree(temp_networkd_path, os.path.join(self.prefix, "run/systemd/network")) except Exception as e: # pragma: nocover (only relevant to filesystem failures) # If we reach here, we're in big trouble. We may have wiped out # file NM or networkd are using, and we most likely removed the # "new" config -- or at least our copy of it. # Given that we're in some halfway done revert; warn the user # aggressively and drop everything; leaving any remaining backups # around for the user to handle themselves. logging.error("Something really bad happened while reverting config: {}".format(e)) logging.error("You should verify the netplan YAML in /etc/netplan and probably run 'netplan apply' again.") sys.exit(-1) def cleanup(self): shutil.rmtree(self.tempdir) def __del__(self): try: self.cleanup() except FileNotFoundError: # If cleanup() was called before, there is nothing to delete pass def _copy_file(self, src, dst): shutil.copy(src, dst) def _copy_tree(self, src, dst, missing_ok=False): try: shutil.copytree(src, dst) except FileNotFoundError: if missing_ok: pass else: raise class ConfigurationError(Exception): """ Configuration could not be parsed or has otherwise failed to apply """ pass PK ! ���� � netplan/terminal.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2018 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Terminal / input handling """ import fcntl import os import termios import select import sys class Terminal(object): """ Do minimal terminal mangling to prompt users for input """ def __init__(self, fd): self.fd = fd self.orig_flags = None self.orig_term = None self.save() def enable_echo(self): if sys.stdin.isatty(): attrs = termios.tcgetattr(self.fd) attrs[3] = attrs[3] | termios.ICANON attrs[3] = attrs[3] | termios.ECHO termios.tcsetattr(self.fd, termios.TCSANOW, attrs) def disable_echo(self): if sys.stdin.isatty(): attrs = termios.tcgetattr(self.fd) attrs[3] = attrs[3] & ~termios.ICANON attrs[3] = attrs[3] & ~termios.ECHO termios.tcsetattr(self.fd, termios.TCSANOW, attrs) def enable_nonblocking_io(self): flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) def disable_nonblocking_io(self): flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) fcntl.fcntl(self.fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) def get_confirmation_input(self, timeout=120, message=None): # pragma: nocover (requires user input) """ Get a "confirmation" input from the user, for at most (timeout) seconds. Optionally, customize the message to be displayed. timeout -- timeout to wait for input (default 120) message -- optional customized message ("Press ENTER to (message)") raises: InputAccepted -- the user confirmed the changes InputRejected -- the user rejected the changes """ print("Do you want to keep these settings?\n\n") settings = dict() self.save(settings) self.disable_echo() self.enable_nonblocking_io() if not message: message = "accept the new configuration" print("Press ENTER before the timeout to {}\n\n".format(message)) timeout_now = timeout while (timeout_now > 0): print("Changes will revert in {:>{}} seconds".format(timeout_now, len(str(timeout))), end='\r') # wait at most 1 second for usable input from stdin select.select([sys.stdin], [], [], 1) try: # retrieve any input from the terminal. select() either has # timed out with no input, or found something we can retrieve. c = sys.stdin.read() if (c == '\n'): self.reset(settings) # Yay, user has accepted the changes! raise InputAccepted() except TypeError: # read() above is non-blocking, if there is nothing to read it # will return TypeError, which we should ignore -- on to the # next iteration until timeout. pass timeout_now -= 1 # We reached the timeout for our loop, now revert our change for # non-blocking I/O and signal the caller the changes were essentially # rejected. self.reset(settings) raise InputRejected() def save(self, dest=None): """ Save the terminal's current attributes and flags Optional argument: - dest: if set, save settings to this dict """ orig_flags = fcntl.fcntl(self.fd, fcntl.F_GETFL) orig_term = None if sys.stdin.isatty(): orig_term = termios.tcgetattr(self.fd) if dest is not None: dest.update({'flags': orig_flags, 'term': orig_term}) else: self.orig_flags = orig_flags self.orig_term = orig_term def reset(self, orig=None): """ Reset the terminal to its original attributes and flags Optional argument: - orig: if set, reset to settings from this dict """ orig_term = None orig_flags = None if orig is not None: orig_term = orig.get('term') orig_flags = orig.get('flags') else: orig_term = self.orig_term orig_flags = self.orig_flags if sys.stdin.isatty(): termios.tcsetattr(self.fd, termios.TCSAFLUSH, orig_term) fcntl.fcntl(self.fd, fcntl.F_SETFL, orig_flags) class InputAccepted(Exception): """ Denotes has accepted input""" pass class InputRejected(Exception): """ Denotes that the user has rejected input""" pass PK ! ��PH PH netplan/libnetplan.pynu �[��� # Copyright (C) 2018-2020 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # Author: Łukasz 'sil2100' Zemczak <lukasz.zemczak@canonical.com> # Author: Lukas 'slyon' Märdian <lukas.maerdian@canonical.com> # Author: Simon Chopin <simon.chopin@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import tempfile import logging import ctypes import ctypes.util from ctypes import c_char_p, c_void_p, c_int, c_uint, c_size_t, c_ssize_t from typing import List, Union, IO class LibNetplanException(Exception): pass class _GError(ctypes.Structure): _fields_ = [("domain", ctypes.c_uint32), ("code", c_int), ("message", c_char_p)] class _netplan_state(ctypes.Structure): pass class _netplan_parser(ctypes.Structure): pass class _netplan_net_definition(ctypes.Structure): pass lib = ctypes.CDLL(ctypes.util.find_library('netplan')) _GErrorPP = ctypes.POINTER(ctypes.POINTER(_GError)) _NetplanParserP = ctypes.POINTER(_netplan_parser) _NetplanStateP = ctypes.POINTER(_netplan_state) _NetplanNetDefinitionP = ctypes.POINTER(_netplan_net_definition) lib.netplan_get_id_from_nm_filename.restype = ctypes.c_char_p def _string_realloc_call_no_error(function): size = 16 while size < 1048576: # 1MB buffer = ctypes.create_string_buffer(size) code = function(buffer) if code == -2: size = size * 2 continue if code < 0: # pragma: nocover raise LibNetplanException("Unknown error: %d" % code) elif code == 0: return None # pragma: nocover as it's hard to trigger for now else: return buffer.value.decode('utf-8') raise LibNetplanException('Halting due to string buffer size > 1M') # pragma: nocover def _checked_lib_call(fn, *args): err = ctypes.POINTER(_GError)() ret = bool(fn(*args, ctypes.byref(err))) if not ret: raise LibNetplanException(err.contents.message.decode('utf-8')) class Parser: _abi_loaded = False @classmethod def _load_abi(cls): if cls._abi_loaded: return lib.netplan_parser_new.restype = _NetplanParserP lib.netplan_parser_clear.argtypes = [ctypes.POINTER(_NetplanParserP)] lib.netplan_parser_load_yaml.argtypes = [_NetplanParserP, c_char_p, _GErrorPP] lib.netplan_parser_load_yaml.restype = c_int lib.netplan_parser_load_yaml_from_fd.argtypes = [_NetplanParserP, c_int, _GErrorPP] lib.netplan_parser_load_yaml_from_fd.restype = c_int lib.netplan_parser_load_nullable_fields.argtypes = [_NetplanParserP, c_int, _GErrorPP] lib.netplan_parser_load_nullable_fields.restype = c_int lib.netplan_parser_load_nullable_overrides.argtypes =\ [_NetplanParserP, c_int, c_char_p, _GErrorPP] lib.netplan_parser_load_nullable_overrides.restype = c_int cls._abi_loaded = True def __init__(self): self._load_abi() self._ptr = lib.netplan_parser_new() def __del__(self): lib.netplan_parser_clear(ctypes.byref(self._ptr)) def load_yaml(self, input_file: Union[str, IO]): if isinstance(input_file, str): _checked_lib_call(lib.netplan_parser_load_yaml, self._ptr, input_file.encode('utf-8')) else: _checked_lib_call(lib.netplan_parser_load_yaml_from_fd, self._ptr, input_file.fileno()) def load_yaml_hierarchy(self, rootdir): _checked_lib_call(lib.netplan_parser_load_yaml_hierarchy, self._ptr, rootdir.encode('utf-8')) def load_nullable_fields(self, input_file: IO): _checked_lib_call(lib.netplan_parser_load_nullable_fields, self._ptr, input_file.fileno()) def load_nullable_overrides(self, input_file: IO, constraint: str): _checked_lib_call(lib.netplan_parser_load_nullable_overrides, self._ptr, input_file.fileno(), constraint.encode('utf-8')) class State: _abi_loaded = False @classmethod def _load_abi(cls): if cls._abi_loaded: return lib.netplan_state_new.restype = _NetplanStateP lib.netplan_state_clear.argtypes = [ctypes.POINTER(_NetplanStateP)] lib.netplan_state_import_parser_results.argtypes = [_NetplanStateP, _NetplanParserP, _GErrorPP] lib.netplan_state_import_parser_results.restype = c_int lib.netplan_state_get_netdefs_size.argtypes = [_NetplanStateP] lib.netplan_state_get_netdefs_size.restype = c_int lib.netplan_state_get_netdef.argtypes = [_NetplanStateP, c_char_p] lib.netplan_state_get_netdef.restype = _NetplanNetDefinitionP lib.netplan_state_write_yaml_file.argtypes = [_NetplanStateP, c_char_p, c_char_p, _GErrorPP] lib.netplan_state_write_yaml_file.restype = c_int lib.netplan_state_update_yaml_hierarchy.argtypes = [_NetplanStateP, c_char_p, c_char_p, _GErrorPP] lib.netplan_state_update_yaml_hierarchy.restype = c_int lib.netplan_state_dump_yaml.argtypes = [_NetplanStateP, c_int, _GErrorPP] lib.netplan_state_dump_yaml.restype = c_int lib.netplan_netdef_get_embedded_switch_mode.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_embedded_switch_mode.restype = c_char_p lib.netplan_netdef_get_delay_virtual_functions_rebind.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_delay_virtual_functions_rebind.restype = c_int lib.netplan_state_get_backend.argtypes = [_NetplanStateP] lib.netplan_state_get_backend.restype = c_int cls._abi_loaded = True def __init__(self): self._load_abi() self._ptr = lib.netplan_state_new() def __del__(self): lib.netplan_state_clear(ctypes.byref(self._ptr)) def import_parser_results(self, parser): _checked_lib_call(lib.netplan_state_import_parser_results, self._ptr, parser._ptr) def write_yaml_file(self, filename, rootdir): name = filename.encode('utf-8') if filename else None root = rootdir.encode('utf-8') if rootdir else None _checked_lib_call(lib.netplan_state_write_yaml_file, self._ptr, name, root) def update_yaml_hierarchy(self, default_filename, rootdir): name = default_filename.encode('utf-8') root = rootdir.encode('utf-8') if rootdir else None _checked_lib_call(lib.netplan_state_update_yaml_hierarchy, self._ptr, name, root) def dump_yaml(self, output_file): fd = output_file.fileno() _checked_lib_call(lib.netplan_state_dump_yaml, self._ptr, fd) def __len__(self): return lib.netplan_state_get_netdefs_size(self._ptr) def __getitem__(self, def_id): ptr = lib.netplan_state_get_netdef(self._ptr, def_id.encode('utf-8')) if not ptr: raise IndexError() return NetDefinition(self, ptr) @property def all_defs(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, None)) @property def ethernets(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "ethernets")) @property def modems(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "modems")) @property def wifis(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "wifis")) @property def vlans(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "vlans")) @property def bridges(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "bridges")) @property def bonds(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "bonds")) @property def tunnels(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "tunnels")) @property def vrfs(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "vrfs")) @property def ovs_ports(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "_ovs-ports")) @property def nm_devices(self): return dict((nd.id, nd) for nd in _NetdefIterator(self, "nm-devices")) @property def backend(self): return lib.netplan_backend_name(lib.netplan_state_get_backend(self._ptr)).decode('utf-8') def dump_to_logs(self): # Convoluted way to dump the parsed config to the logs... with tempfile.TemporaryFile() as tmp: self.dump_yaml(output_file=tmp) logging.debug("Merged config:\n{}".format(tmp.read())) class NetDefinition: _abi_loaded = False @classmethod def _load_abi(cls): if cls._abi_loaded: return lib.netplan_netdef_has_match.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_has_match.restype = c_int lib.netplan_netdef_get_id.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t] lib.netplan_netdef_get_id.restype = c_ssize_t lib.netplan_netdef_get_filepath.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t] lib.netplan_netdef_get_filepath.restype = c_ssize_t lib.netplan_netdef_get_backend.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_backend.restype = c_int lib.netplan_netdef_get_type.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_type.restype = c_int lib.netplan_netdef_get_set_name.argtypes = [_NetplanNetDefinitionP, c_char_p, c_size_t] lib.netplan_netdef_get_set_name.restype = c_ssize_t lib._netplan_netdef_get_critical.argtypes = [_NetplanNetDefinitionP] lib._netplan_netdef_get_critical.restype = c_int lib.netplan_netdef_get_sriov_link.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_sriov_link.restype = _NetplanNetDefinitionP lib.netplan_netdef_get_vlan_link.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_vlan_link.restype = _NetplanNetDefinitionP lib.netplan_netdef_get_bridge_link.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_bridge_link.restype = _NetplanNetDefinitionP lib.netplan_netdef_get_bond_link.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_bond_link.restype = _NetplanNetDefinitionP lib.netplan_netdef_get_peer_link.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_get_peer_link.restype = _NetplanNetDefinitionP lib._netplan_netdef_get_vlan_id.argtypes = [_NetplanNetDefinitionP] lib._netplan_netdef_get_vlan_id.restype = c_uint lib._netplan_netdef_get_sriov_vlan_filter.argtypes = [_NetplanNetDefinitionP] lib._netplan_netdef_get_sriov_vlan_filter.restype = c_int lib.netplan_netdef_match_interface.argtypes = [_NetplanNetDefinitionP] lib.netplan_netdef_match_interface.restype = c_int lib.netplan_backend_name.argtypes = [c_int] lib.netplan_backend_name.restype = c_char_p lib.netplan_def_type_name.argtypes = [c_int] lib.netplan_def_type_name.restype = c_char_p lib._netplan_state_get_vf_count_for_def.argtypes = [_NetplanStateP, _NetplanNetDefinitionP, _GErrorPP] lib._netplan_state_get_vf_count_for_def.restype = c_int lib._netplan_netdef_is_trivial_compound_itf.argtypes = [_NetplanNetDefinitionP] lib._netplan_netdef_is_trivial_compound_itf.restype = c_int cls._abi_loaded = True def __eq__(self, other): if not hasattr(other, '_ptr'): return False return ctypes.addressof(self._ptr.contents) == ctypes.addressof(other._ptr.contents) def __init__(self, np_state, ptr): self._load_abi() self._ptr = ptr # We hold on to this to avoid the underlying pointer being invalidated by # the GC invoking netplan_state_free self._parent = np_state @property def has_match(self): return bool(lib.netplan_netdef_has_match(self._ptr)) @property def set_name(self): return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_set_name(self._ptr, b, len(b))) @property def critical(self): return bool(lib._netplan_netdef_get_critical(self._ptr)) @property def sriov_link(self): link_ptr = lib.netplan_netdef_get_sriov_link(self._ptr) if link_ptr: return NetDefinition(self._parent, link_ptr) return None @property def vlan_link(self): link_ptr = lib.netplan_netdef_get_vlan_link(self._ptr) if link_ptr: return NetDefinition(self._parent, link_ptr) return None @property def bridge_link(self): link_ptr = lib.netplan_netdef_get_bridge_link(self._ptr) if link_ptr: return NetDefinition(self._parent, link_ptr) return None @property def bond_link(self): link_ptr = lib.netplan_netdef_get_bond_link(self._ptr) if link_ptr: return NetDefinition(self._parent, link_ptr) return None @property def peer_link(self): link_ptr = lib.netplan_netdef_get_peer_link(self._ptr) if link_ptr: return NetDefinition(self._parent, link_ptr) return None # pragma: nocover (ovs ports are always defined in pairs) @property def vlan_id(self): vlan_id = lib._netplan_netdef_get_vlan_id(self._ptr) # No easy way to get UINT_MAX besides this... if vlan_id == c_uint(-1).value: return None return vlan_id @property def has_sriov_vlan_filter(self): return bool(lib._netplan_netdef_get_sriov_vlan_filter(self._ptr)) @property def backend(self): return lib.netplan_backend_name(lib.netplan_netdef_get_backend(self._ptr)).decode('utf-8') @property def type(self): return lib.netplan_def_type_name(lib.netplan_netdef_get_type(self._ptr)).decode('utf-8') @property def id(self): return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_id(self._ptr, b, len(b))) @property def filepath(self): return _string_realloc_call_no_error(lambda b: lib.netplan_netdef_get_filepath(self._ptr, b, len(b))) @property def embedded_switch_mode(self): mode = lib.netplan_netdef_get_embedded_switch_mode(self._ptr) return mode and mode.decode('utf-8') @property def delay_virtual_functions_rebind(self): return bool(lib.netplan_netdef_get_delay_virtual_functions_rebind(self._ptr)) def match_interface(self, itf_name=None, itf_driver=None, itf_mac=None): return bool(lib.netplan_netdef_match_interface( self._ptr, itf_name and itf_name.encode('utf-8'), itf_mac and itf_mac.encode('utf-8'), itf_driver and itf_driver.encode('utf-8'))) @property def vf_count(self): err = ctypes.POINTER(_GError)() count = lib._netplan_state_get_vf_count_for_def(self._parent._ptr, self._ptr, ctypes.byref(err)) if count < 0: raise LibNetplanException(err.contents.message.decode('utf-8')) return count @property def is_trivial_compound_itf(self): ''' Returns True if the interface is a compound interface (bond or bridge), and its configuration is trivial, without any variation from the defaults. ''' return bool(lib._netplan_netdef_is_trivial_compound_itf(self._ptr)) class _NetdefIterator: _abi_loaded = False @classmethod def _load_abi(cls): if cls._abi_loaded: return if not hasattr(lib, '_netplan_iter_defs_per_devtype_init'): # pragma: nocover (hard to unit-test against the WRONG lib) raise LibNetplanException(''' The current version of libnetplan does not allow iterating by devtype. Please ensure that both the netplan CLI package and its library are up to date. ''') lib._netplan_state_new_netdef_pertype_iter.argtypes = [_NetplanStateP, c_char_p] lib._netplan_state_new_netdef_pertype_iter.restype = c_void_p lib._netplan_iter_defs_per_devtype_next.argtypes = [c_void_p] lib._netplan_iter_defs_per_devtype_next.restype = _NetplanNetDefinitionP lib._netplan_iter_defs_per_devtype_free.argtypes = [c_void_p] lib._netplan_iter_defs_per_devtype_free.restype = None lib._netplan_netdef_id.argtypes = [c_void_p] lib._netplan_netdef_id.restype = c_char_p cls._abi_loaded = True def __init__(self, np_state, devtype): self._load_abi() # To keep things valid, keep a reference to the parent state self.np_state = np_state self.iterator = lib._netplan_state_new_netdef_pertype_iter(np_state._ptr, devtype and devtype.encode('utf-8')) def __del__(self): lib._netplan_iter_defs_per_devtype_free(self.iterator) def __iter__(self): return self def __next__(self): next_value = lib._netplan_iter_defs_per_devtype_next(self.iterator) if not next_value: raise StopIteration return NetDefinition(self.np_state, next_value) lib.netplan_util_create_yaml_patch.argtypes = [c_char_p, c_char_p, c_int, _GErrorPP] lib.netplan_util_create_yaml_patch.restype = c_int lib.netplan_util_dump_yaml_subtree.argtypes = [c_char_p, c_int, c_int, _GErrorPP] lib.netplan_util_dump_yaml_subtree.restype = c_int def create_yaml_patch(patch_object_path: List[str], patch_payload: str, patch_output): _checked_lib_call(lib.netplan_util_create_yaml_patch, '\t'.join(patch_object_path).encode('utf-8'), patch_payload.encode('utf-8'), patch_output.fileno()) def dump_yaml_subtree(prefix, input_file, output_file): _checked_lib_call(lib.netplan_util_dump_yaml_subtree, prefix.encode('utf-8'), input_file.fileno(), output_file.fileno()) PK ! �i-� � netplan/__init__.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2018 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from netplan.cli.core import Netplan __all__ = [Netplan] PK ! �$<�O �O netplan/cli/commands/apply.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2018-2020 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # Author: Łukasz 'sil2100' Zemczak <lukasz.zemczak@canonical.com> # Author: Lukas 'slyon' Märdian <lukas.maerdian@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. '''netplan apply command line''' import logging import os import sys import glob import subprocess import shutil import netifaces import time import netplan.cli.utils as utils from netplan.configmanager import ConfigManager, ConfigurationError from netplan.cli.sriov import apply_sriov_config from netplan.cli.ovs import OvsDbServerNotRunning, apply_ovs_cleanup OVS_CLEANUP_SERVICE = 'netplan-ovs-cleanup.service' IF_NAMESIZE = 16 class NetplanApply(utils.NetplanCommand): def __init__(self): super().__init__(command_id='apply', description='Apply current netplan config to running system', leaf=True) self.sriov_only = False self.only_ovs_cleanup = False self.state = None # to be filled by the '--state' argument def run(self): # pragma: nocover (covered in autopkgtest) self.parser.add_argument('--sriov-only', action='store_true', help='Only apply SR-IOV related configuration and exit') self.parser.add_argument('--only-ovs-cleanup', action='store_true', help='Only clean up old OpenVSwitch interfaces and exit') self.parser.add_argument('--state', help='Directory containing previous YAML configuration') self.func = self.command_apply self.parse_args() self.run_command() def command_apply(self, run_generate=True, sync=False, exit_on_error=True, state_dir=None): # pragma: nocover config_manager = ConfigManager() if state_dir: self.state = state_dir # For certain use-cases, we might want to only apply specific configuration. # If we only need SR-IOV configuration, do that and exit early. if self.sriov_only: NetplanApply.process_sriov_config(config_manager, exit_on_error) return # If we only need OpenVSwitch cleanup, do that and exit early. elif self.only_ovs_cleanup: NetplanApply.process_ovs_cleanup(config_manager, False, False, exit_on_error) return # if we are inside a snap, then call dbus to run netplan apply instead if "SNAP" in os.environ: # TODO: maybe check if we are inside a classic snap and don't do # this if we are in a classic snap? busctl = shutil.which("busctl") if busctl is None: raise RuntimeError("missing busctl utility") # XXX: DO NOT TOUCH or change this API call, it is used by snapd to communicate # using core20 netplan binary/client/CLI on core18 base systems. Any change # must be agreed upon with the snapd team, so we don't break support for # base systems running older netplan versions. # https://github.com/snapcore/snapd/pull/5915 res = subprocess.call([busctl, "call", "--quiet", "--system", "io.netplan.Netplan", # the service "/io/netplan/Netplan", # the object "io.netplan.Netplan", # the interface "Apply", # the method ]) if res != 0: if exit_on_error: sys.exit(res) elif res == 130: raise PermissionError( "failed to communicate with dbus service") else: raise RuntimeError( "failed to communicate with dbus service: error %s" % res) else: return ovs_cleanup_service = '/run/systemd/system/netplan-ovs-cleanup.service' old_files_networkd = bool(glob.glob('/run/systemd/network/*netplan-*')) old_ovs_glob = glob.glob('/run/systemd/system/netplan-ovs-*') # Ignore netplan-ovs-cleanup.service, as it can always be there if ovs_cleanup_service in old_ovs_glob: old_ovs_glob.remove(ovs_cleanup_service) old_files_ovs = bool(old_ovs_glob) old_nm_glob = glob.glob('/run/NetworkManager/system-connections/netplan-*') nm_ifaces = utils.nm_interfaces(old_nm_glob, netifaces.interfaces()) old_files_nm = bool(old_nm_glob) generator_call = [] generate_out = None if 'NETPLAN_PROFILE' in os.environ: generator_call.extend(['valgrind', '--leak-check=full']) generate_out = subprocess.STDOUT generator_call.append(utils.get_generator_path()) if run_generate and subprocess.call(generator_call, stderr=generate_out) != 0: if exit_on_error: sys.exit(os.EX_CONFIG) else: raise ConfigurationError("the configuration could not be generated") devices = netifaces.interfaces() # Re-start service when # 1. We have configuration files for it # 2. Previously we had config files for it but not anymore # Ideally we should compare the content of the *netplan-* files before and # after generation to minimize the number of re-starts, but the conditions # above works too. restart_networkd = bool(glob.glob('/run/systemd/network/*netplan-*')) if not restart_networkd and old_files_networkd: restart_networkd = True restart_ovs_glob = glob.glob('/run/systemd/system/netplan-ovs-*') # Ignore netplan-ovs-cleanup.service, as it can always be there if ovs_cleanup_service in restart_ovs_glob: restart_ovs_glob.remove(ovs_cleanup_service) restart_ovs = bool(restart_ovs_glob) if not restart_ovs and old_files_ovs: # OVS is managed via systemd units restart_networkd = True restart_nm_glob = glob.glob('/run/NetworkManager/system-connections/netplan-*') nm_ifaces.update(utils.nm_interfaces(restart_nm_glob, devices)) restart_nm = bool(restart_nm_glob) if not restart_nm and old_files_nm: restart_nm = True # Running 'systemctl daemon-reload' will re-run the netplan systemd generator, # so let's make sure we only run it iff we're willing to run 'netplan generate' if run_generate: utils.systemctl_daemon_reload() # stop backends if restart_networkd: logging.debug('netplan generated networkd configuration changed, reloading networkd') # Clean up any old netplan related OVS ports/bonds/bridges, if applicable NetplanApply.process_ovs_cleanup(config_manager, old_files_ovs, restart_ovs, exit_on_error) wpa_services = ['netplan-wpa-*.service'] # Historically (up to v0.98) we had netplan-wpa@*.service files, in case of an # upgraded system, we need to make sure to stop those. if utils.systemctl_is_active('netplan-wpa@*.service'): wpa_services.insert(0, 'netplan-wpa@*.service') utils.systemctl('stop', wpa_services, sync=sync) else: logging.debug('no netplan generated networkd configuration exists') if restart_nm: logging.debug('netplan generated NM configuration changed, restarting NM') if utils.nm_running(): # restarting NM does not cause new config to be applied, need to shut down devices first for device in devices: if device not in nm_ifaces: continue # do not touch this interface # ignore failures here -- some/many devices might not be managed by NM try: utils.nmcli(['device', 'disconnect', device]) except subprocess.CalledProcessError: pass utils.systemctl_network_manager('stop', sync=sync) else: logging.debug('no netplan generated NM configuration exists') # Refresh devices now; restarting a backend might have made something appear. devices = netifaces.interfaces() # evaluate config for extra steps we need to take (like renaming) # for now, only applies to non-virtual (real) devices. config_manager.parse() changes = NetplanApply.process_link_changes(devices, config_manager) # delete virtual interfaces that have been defined in a previous state # but are not configured anymore in the current YAML if self.state: cm = ConfigManager(self.state) cm.parse() # get previous configuration state prev_links = cm.virtual_interfaces.keys() curr_links = config_manager.virtual_interfaces.keys() NetplanApply.clear_virtual_links(prev_links, curr_links, devices) # if the interface is up, we can still apply some .link file changes # but we cannot apply the interface rename via udev, as it won't touch # the interface name, if it was already renamed once (e.g. during boot), # because of the NamePolicy=keep default: # https://www.freedesktop.org/software/systemd/man/systemd.net-naming-scheme.html devices = netifaces.interfaces() for device in devices: logging.debug('netplan triggering .link rules for %s', device) try: subprocess.check_call(['udevadm', 'test-builtin', 'net_setup_link', '/sys/class/net/' + device], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.check_call(['udevadm', 'test', '/sys/class/net/' + device], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError: logging.debug('Ignoring device without syspath: %s', device) devices_after_udev = netifaces.interfaces() # apply some more changes manually for iface, settings in changes.items(): # rename non-critical network interfaces new_name = settings.get('name') if new_name: if len(new_name) >= IF_NAMESIZE: logging.warning('Interface name {} is too long. {} will not be renamed'.format(new_name, iface)) continue if iface in devices and new_name in devices_after_udev: logging.debug('Interface rename {} -> {} already happened.'.format(iface, new_name)) continue # re-name already happened via 'udevadm test' # bring down the interface, using its current (matched) interface name subprocess.check_call(['ip', 'link', 'set', 'dev', iface, 'down'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # rename the interface to the name given via 'set-name' subprocess.check_call(['ip', 'link', 'set', 'dev', iface, 'name', settings.get('name')], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Reloading of udev rules happens during 'netplan generate' already # subprocess.check_call(['udevadm', 'control', '--reload-rules']) subprocess.check_call(['udevadm', 'trigger', '--attr-match=subsystem=net']) subprocess.check_call(['udevadm', 'settle']) # apply any SR-IOV related changes, if applicable NetplanApply.process_sriov_config(config_manager, exit_on_error) # (re)set global regulatory domain if os.path.exists('/run/systemd/system/netplan-regdom.service'): utils.systemctl('start', ['netplan-regdom.service']) # (re)start backends if restart_networkd: netplan_wpa = [os.path.basename(f) for f in glob.glob('/run/systemd/system/*.wants/netplan-wpa-*.service')] # exclude the special 'netplan-ovs-cleanup.service' unit netplan_ovs = [os.path.basename(f) for f in glob.glob('/run/systemd/system/*.wants/netplan-ovs-*.service') if not f.endswith('/' + OVS_CLEANUP_SERVICE)] # Run 'systemctl start' command synchronously, to avoid race conditions # with 'oneshot' systemd service units, e.g. netplan-ovs-*.service. try: utils.networkctl_reload() utils.networkctl_reconfigure(utils.networkd_interfaces()) except subprocess.CalledProcessError: # (re-)start systemd-networkd if it is not running, yet logging.warning('Falling back to a hard restart of systemd-networkd.service') utils.systemctl('restart', ['systemd-networkd.service'], sync=True) # 1st: execute OVS cleanup, to avoid races while applying OVS config utils.systemctl('start', [OVS_CLEANUP_SERVICE], sync=True) # 2nd: start all other services utils.systemctl('start', netplan_wpa + netplan_ovs, sync=True) if restart_nm: # Flush all IP addresses of NM managed interfaces, to avoid NM creating # new, non netplan-* connection profiles, using the existing IPs. for iface in utils.nm_interfaces(restart_nm_glob, devices): utils.ip_addr_flush(iface) # clear NM state, especially the [device].managed=true config, as that might have been # re-set via an udev rule setting "NM_UNMANAGED=1" shutil.rmtree('/run/NetworkManager/devices', ignore_errors=True) utils.systemctl_network_manager('start', sync=sync) if sync: # 'nmcli' could be /usr/bin/nmcli or # /snap/bin/nmcli -> /snap/bin/network-manager.nmcli cmd = ['nmcli', 'general', 'status'] # wait a bit for 'connected (site/local-only)' or # 'connected' to appear in 'nmcli general' STATE for _ in range(10): out = subprocess.run(cmd, capture_output=True, text=True) # Handle nmcli's "not running" return code (8) gracefully, # giving some more time for NetworkManager startup if out.returncode == 8: time.sleep(1) continue if '\nconnected' in str(out.stdout): break time.sleep(0.5) @staticmethod def is_composite_member(composites, phy): """ Is this physical interface a member of a 'composite' virtual interface? (bond, bridge) """ for composite in composites: for _, settings in composite.items(): if not type(settings) is dict: continue members = settings.get('interfaces', []) for iface in members: if iface == phy: return True return False @staticmethod def clear_virtual_links(prev_links, curr_links, devices=[]): """ Calculate the delta of virtual links. And remove the links that were dropped from the YAML config, if they were not dropped by the backend already. We can make use of the netplan netdef ids, as those equal the interface name for virtual links. """ if not devices: logging.warning('Cannot clear virtual links: no network interfaces provided.') return [] dropped_interfaces = list(set(prev_links) - set(curr_links)) # some interfaces might have been cleaned up already, e.g. by the # NetworkManager backend interfaces_to_clear = list(set(dropped_interfaces).intersection(devices)) for link in interfaces_to_clear: try: cmd = ['ip', 'link', 'delete', 'dev', link] subprocess.check_call(cmd) except subprocess.CalledProcessError: logging.warning('Could not delete interface {}'.format(link)) return dropped_interfaces @staticmethod def process_link_changes(interfaces, config_manager: ConfigManager): # pragma: nocover (covered in autopkgtest) """ Go through the pending changes and pick what needs special handling. Only applies to non-critical interfaces which can be safely updated. """ changes = {} composite_interfaces = [config_manager.bridges, config_manager.bonds] # Find physical interfaces which need a rename # But do not rename virtual interfaces for netdef in config_manager.physical_interfaces.values(): newname = netdef.set_name if not newname: continue # Skip if no new name needs to be set if not netdef.has_match: continue # Skip if no match for current name is given if NetplanApply.is_composite_member(composite_interfaces, netdef.id): logging.debug('Skipping composite member {}'.format(netdef.id)) # do not rename members of virtual devices. MAC addresses # may be the same for all interface members. continue # Find current name of the interface, according to match conditions and globs (name, mac, driver) current_iface_name = utils.find_matching_iface(interfaces, netdef) if not current_iface_name: logging.warning('Cannot find unique matching interface for {}'.format(netdef.id)) continue if current_iface_name == newname: # Skip interface if it already has the correct name logging.debug('Skipping correctly named interface: {}'.format(newname)) continue if netdef.critical: # Skip interfaces defined as critical, as we should not take them down in order to rename logging.warning('Cannot rename {} ({} -> {}) at runtime (needs reboot), due to being critical' .format(netdef.id, current_iface_name, newname)) continue # record the interface rename change changes[current_iface_name] = {'name': newname} logging.debug('Link changes: {}'.format(changes)) return changes @staticmethod def process_sriov_config(config_manager, exit_on_error=True): # pragma: nocover (covered in autopkgtest) try: apply_sriov_config(config_manager) except utils.config_errors as e: logging.error(str(e)) if exit_on_error: sys.exit(1) @staticmethod def process_ovs_cleanup(config_manager, ovs_old, ovs_current, exit_on_error=True): # pragma: nocover (autopkgtest) try: apply_ovs_cleanup(config_manager, ovs_old, ovs_current) except (OSError, RuntimeError) as e: logging.error(str(e)) if exit_on_error: sys.exit(1) except OvsDbServerNotRunning as e: logging.warning('Cannot call Open vSwitch: {}.'.format(e)) PK ! �2�9c c netplan/cli/commands/ip.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2018 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. '''netplan ip command line''' import logging import os import sys import subprocess from subprocess import CalledProcessError import netplan.cli.utils as utils lease_path = { 'networkd': { 'pattern': 'run/systemd/netif/leases/{lease_id}', 'method': 'ifindex', }, 'NetworkManager': { 'pattern': 'var/lib/NetworkManager/internal-{lease_id}-{interface}.lease', 'method': 'nm_connection', }, } class NetplanIp(utils.NetplanCommand): def __init__(self): super().__init__(command_id='ip', description='Retrieve IP information from the system', leaf=False) def run(self): self.command_leases = NetplanIpLeases() # subcommand: leases p_ip_leases = self.subparsers.add_parser('leases', help='Display IP leases', add_help=False) p_ip_leases.set_defaults(func=self.command_leases.run, commandclass=self.command_leases) self.parse_args() self.run_command() class NetplanIpLeases(utils.NetplanCommand): def __init__(self): super().__init__(command_id='ip leases', description='Display IP leases', leaf=True) def run(self): self.parser.add_argument('interface', help='Interface for which to display IP lease settings.') self.parser.add_argument('--root-dir', help='Search for configuration files in this root directory instead of /') self.func = self.command_ip_leases self.parse_args() self.run_command() def command_ip_leases(self): if self.interface == 'help': # pragma: nocover (covered in autopkgtest) self.print_usage() def find_lease_file(mapping): def lease_method_ifindex(): ifindex_f = os.path.join('/sys/class/net', self.interface, 'ifindex') try: with open(ifindex_f) as f: return f.readlines()[0].strip() except Exception as e: logging.debug('Cannot read file %s: %s', ifindex_f, str(e)) raise def lease_method_nm_connection(): # FIXME: handle older versions of NM where 'nmcli dev show' doesn't exist try: nmcli_dev_out = utils.nmcli_out(['dev', 'show', self.interface]) for line in nmcli_dev_out.splitlines(): if 'GENERAL.CONNECTION' in line: conn_id = line.split(':')[1].rstrip().strip() nmcli_con_out = utils.nmcli_out(['con', 'show', 'id', conn_id]) for line in nmcli_con_out.splitlines(): if 'connection.uuid' in line: return line.split(':')[1].rstrip().strip() except Exception as e: raise Exception('Could not find a NetworkManager connection for the interface: %s' % str(e)) raise Exception('Could not find a NetworkManager connection for the interface') lease_pattern = lease_path[mapping['backend']]['pattern'] lease_method = lease_path[mapping['backend']]['method'] try: lease_id = eval("lease_method_" + lease_method)() # We found something to build the path to the lease file with, # at this point we may have something to look at; but if not, # we'll rely on open() throwing an error. # This might happen if networkd doesn't use DHCP for the interface, # for instance. path = os.path.join('/', os.path.abspath(self.root_dir) if self.root_dir else "", lease_pattern.format(interface=self.interface, lease_id=lease_id)) # Fallback to 'dhclient' if no lease of NetworkManager's # internal DHCP client is found if not os.path.isfile(path): path = path.replace('NetworkManager/internal-', 'NetworkManager/dhclient-') with open(path) as f: for line in f.readlines(): print(line.rstrip()) except Exception as e: print("No lease found for interface '%s': %s" % (self.interface, str(e)), file=sys.stderr) sys.exit(1) argv = [utils.get_generator_path()] if self.root_dir: argv += ['--root-dir', self.root_dir] argv += ['--mapping', self.interface] # Extract out of the generator our mapping in a dict. logging.debug('command ip leases: running %s', argv) try: out = subprocess.check_output(argv, text=True) except CalledProcessError: # pragma: nocover (better be covered in autopkgtest) print("No lease found for interface '%s' (not managed by Netplan)" % self.interface, file=sys.stderr) sys.exit(1) mapping = {} mapping_s = out.split(',') for keyvalue in mapping_s: key, value = keyvalue.strip().split('=') mapping[key] = value find_lease_file(mapping) PK ! �E�� � netplan/cli/commands/info.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2019 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. '''netplan info command line''' import netplan.cli.utils as utils import netplan._features class NetplanInfo(utils.NetplanCommand): def __init__(self): super().__init__(command_id='info', description='Show available features', leaf=True) def run(self): # pragma: nocover (covered in autopkgtest) format_group = self.parser.add_mutually_exclusive_group(required=False) format_group.add_argument('--json', dest='version_format', action='store_const', const='json', help='Output version and features in JSON format') format_group.add_argument('--yaml', dest='version_format', action='store_const', const='yaml', help='Output version and features in YAML format') self.func = self.command_info self.parse_args() self.run_command() def command_info(self): netplan_version = { 'netplan.io': { 'website': 'https://netplan.io/', } } flags = netplan._features.NETPLAN_FEATURE_FLAGS netplan_version['netplan.io'].update({'features': flags}) # Default to output in YAML format. if self.version_format is None: self.version_format = 'yaml' if self.version_format == 'json': import json print(json.dumps(netplan_version, indent=2)) elif self.version_format == 'yaml': print('''netplan.io: website: "{}" features:'''.format(netplan_version['netplan.io']['website'])) for feature in netplan._features.NETPLAN_FEATURE_FLAGS: print(' - ' + feature) PK ! ��~��k �k netplan/cli/commands/status.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2022 Canonical, Ltd. # Author: Lukas Märdian <slyon@ubuntu.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. '''netplan status command line''' import ipaddress import json import logging import re import socket import subprocess import sys from typing import Union, Dict, List, Type import dbus import yaml import netplan.cli.utils as utils JSON = Union[Dict[str, 'JSON'], List['JSON'], int, str, float, bool, Type[None]] MATCH_TAGS = re.compile(r'\[([a-z0-9]+)\].*\[\/\1\]') RICH_OUTPUT = False try: from rich.console import Console from rich.highlighter import RegexHighlighter from rich.theme import Theme class NetplanHighlighter(RegexHighlighter): base_style = 'netplan.' highlights = [ r'(^|[\s\/])(?P<int>\d+)([\s:]?\s|$)', r'(?P<str>(\"|\').+(\"|\'))', ] RICH_OUTPUT = True except ImportError: # pragma: nocover (we mock RICH_OUTPUT, ignore the logging) logging.debug("python3-rich not found, falling back to plain output") class Interface(): def __extract_mac(self, ip: dict) -> str: ''' Extract the MAC address if it's set inside the JSON data and seems to have the correct format. Return 'None' otherwise. ''' if len(address := ip.get('address', '')) == 17: # 6 byte MAC (+5 colons) return address.lower() return None def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [], resolved_data: tuple = (None, None), route_data: tuple = (None, None)): self.idx: int = ip.get('ifindex', -1) self.name: str = ip.get('ifname', 'unknown') self.adminstate: str = 'UP' if 'UP' in ip.get('flags', []) else 'DOWN' self.operstate: str = ip.get('operstate', 'unknown').upper() self.macaddress: str = self.__extract_mac(ip) # Filter networkd/NetworkManager data nm_data = nm_data or [] # avoid 'None' value on systems without NM self.nd: JSON = next((x for x in nd_data if x['Index'] == self.idx), None) self.nm: JSON = next((x for x in nm_data if x['device'] == self.name), None) # Filter resolved's DNS data self.dns_addresses: list = None if resolved_data[0]: self.dns_addresses = [] for itr in resolved_data[0]: if int(itr[0]) == int(self.idx): ipfamily = itr[1] dns = itr[2] self.dns_addresses.append(socket.inet_ntop(ipfamily, b''.join([v.to_bytes(1, 'big') for v in dns]))) self.dns_search: list = None if resolved_data[1]: self.dns_search = [] for v in resolved_data[1]: if int(v[0]) == int(self.idx): self.dns_search.append(str(v[1])) # Filter route data _routes: list = [] self.routes: list = None if route_data[0]: _routes += route_data[0] if route_data[1]: _routes += route_data[1] if _routes: self.routes = [] for obj in _routes: if obj.get('dev') == self.name: elem = {'to': obj.get('dst')} val = obj.get('gateway') if val: elem['via'] = val val = obj.get('prefsrc') if val: elem['from'] = val val = obj.get('metric') if val: elem['metric'] = val val = obj.get('type') if val: elem['type'] = val val = obj.get('scope') if val: elem['scope'] = val val = obj.get('protocol') if val: elem['protocol'] = val self.routes.append(elem) self.addresses: list = None if addr_info := ip.get('addr_info'): self.addresses = [] for addr in addr_info: flags: list = [] if ipaddress.ip_address(addr['local']).is_link_local: flags.append('link') if self.routes: for route in self.routes: if ('from' in route and ipaddress.ip_address(route['from']) == ipaddress.ip_address(addr['local'])): if route['protocol'] == 'dhcp': flags.append('dhcp') break ip_addr = addr['local'].lower() elem = {ip_addr: {'prefix': addr['prefixlen']}} if flags: elem[ip_addr]['flags'] = flags self.addresses.append(elem) self.iproute_type: str = None if info_kind := ip.get('linkinfo', {}).get('info_kind'): self.iproute_type = info_kind.strip() # workaround: query some data which is not available via networkctl's JSON output self._networkctl: str = self.query_networkctl(self.name) or '' def query_nm_ssid(self, con_name: str) -> str: ssid: str = None try: ssid = utils.nmcli_out(['--get-values', '802-11-wireless.ssid', 'con', 'show', 'id', con_name]) return ssid.strip() except Exception as e: logging.warning('Cannot query NetworkManager SSID for {}: {}'.format( con_name, str(e))) return ssid def query_networkctl(self, ifname: str) -> str: output: str = None try: output = subprocess.check_output(['networkctl', 'status', '--', ifname], text=True) except Exception as e: logging.warning('Cannot query networkctl for {}: {}'.format( ifname, str(e))) return output def json(self) -> JSON: json = { 'index': self.idx, 'adminstate': self.adminstate, 'operstate': self.operstate, } if self.type: json['type'] = self.type if self.ssid: json['ssid'] = self.ssid if self.tunnel_mode: json['tunnel_mode'] = self.tunnel_mode if self.backend: json['backend'] = self.backend if self.netdef_id: json['id'] = self.netdef_id if self.macaddress: json['macaddress'] = self.macaddress if self.vendor: json['vendor'] = self.vendor if self.addresses: json['addresses'] = self.addresses if self.dns_addresses: json['dns_addresses'] = self.dns_addresses if self.dns_search: json['dns_search'] = self.dns_search if self.routes: json['routes'] = self.routes if self.activation_mode: json['activation_mode'] = self.activation_mode return (self.name, json) @property def up(self) -> bool: return self.adminstate == 'UP' and self.operstate == 'UP' @property def down(self) -> bool: return self.adminstate == 'DOWN' and self.operstate == 'DOWN' @property def type(self) -> str: match = dict({ 'bond': 'bond', 'bridge': 'bridge', 'ether': 'ethernet', 'ipgre': 'tunnel', 'ip6gre': 'tunnel', 'loopback': 'ethernet', 'sit': 'tunnel', 'tunnel': 'tunnel', 'tunnel6': 'tunnel', 'wireguard': 'tunnel', 'wlan': 'wifi', 'wwan': 'modem', 'vrf': 'vrf', 'vxlan': 'tunnel', }) nd_type = self.nd.get('Type') if self.nd else None if nd_type in match: return match[nd_type] logging.warning('Unknown device type: {}'.format(nd_type)) return None @property def tunnel_mode(self) -> str: if self.type == 'tunnel' and self.iproute_type: return self.iproute_type return None @property def backend(self) -> str: if (self.nd and 'unmanaged' not in self.nd.get('SetupState', '') and 'run/systemd/network/10-netplan-' in self.nd.get('NetworkFile', '')): return 'networkd' elif self.nm and 'run/NetworkManager/system-connections/netplan-' in self.nm.get('filename', ''): return 'NetworkManager' return None @property def netdef_id(self) -> str: if self.backend == 'networkd': return self.nd.get('NetworkFile', '').split( 'run/systemd/network/10-netplan-')[1].split('.network')[0] elif self.backend == 'NetworkManager': netdef = self.nm.get('filename', '').split( 'run/NetworkManager/system-connections/netplan-')[1].split('.nmconnection')[0] if self.nm.get('type', '') == '802-11-wireless': ssid = self.query_nm_ssid(self.nm.get('name')) if ssid: # XXX: escaping needed? netdef = netdef.split('-' + ssid)[0] return netdef return None @property def vendor(self) -> str: if self.nd and 'Vendor' in self.nd and self.nd['Vendor']: return self.nd['Vendor'].strip() return None @property def ssid(self) -> str: if self.type == 'wifi': # XXX: available from networkctl's JSON output as of v250: # https://github.com/systemd/systemd/commit/da7c995 for line in self._networkctl.splitlines(): line = line.strip() key = 'WiFi access point: ' if line.startswith(key): ssid = line[len(key):-len(' (xB:SS:ID:xx:xx:xx)')].strip() return ssid if ssid else None return None @property def activation_mode(self) -> str: if self.backend == 'networkd': # XXX: available from networkctl's JSON output as of v250: # https://github.com/systemd/systemd/commit/3b60ede for line in self._networkctl.splitlines(): line = line.strip() key = 'Activation Policy: ' if line.startswith(key): mode = line[len(key):].strip() return mode if mode != 'up' else None # XXX: this is not fully supported on NetworkManager, only 'manual'/'up' elif self.backend == 'NetworkManager': return 'manual' if self.nm['autoconnect'] == 'no' else None return None class NetplanStatus(utils.NetplanCommand): def __init__(self): super().__init__(command_id='status', description='Query networking state of the running system', leaf=True) self.all = False def run(self): self.parser.add_argument('ifname', nargs='?', type=str, default=None, help='Show only this interface') self.parser.add_argument('-a', '--all', action='store_true', help='Show all interface data (incl. inactive)') self.parser.add_argument('-f', '--format', default='tabular', help='Output in machine readable `json` or `yaml` format') self.func = self.command self.parse_args() self.run_command() def resolvconf_json(self) -> dict: res = { 'addresses': [], 'search': [], 'mode': None, } try: with open('/etc/resolv.conf') as f: # check first line for systemd-resolved stub or compat modes firstline = f.readline() if '# This is /run/systemd/resolve/stub-resolv.conf' in firstline: res['mode'] = 'stub' elif '# This is /run/systemd/resolve/resolv.conf' in firstline: res['mode'] = 'compat' for line in [firstline] + f.readlines(): if line.startswith('nameserver'): res['addresses'] += line.split()[1:] # append if line.startswith('search'): res['search'] = line.split()[1:] # override except Exception as e: logging.warning('Cannot parse /etc/resolv.conf: {}'.format(str(e))) return res def query_online_state(self, interfaces: list) -> bool: # TODO: fully implement network-online.target specification (FO020): # https://discourse.ubuntu.com/t/spec-definition-of-an-online-system/27838 for itf in interfaces: if itf.up and itf.addresses and itf.routes and itf.dns_addresses: non_local_ips = [] for addr in itf.addresses: ip, extra = list(addr.items())[0] if 'flags' not in extra or 'link' not in extra['flags']: non_local_ips.append(ip) default_routes = [x for x in itf.routes if x.get('to', None) == 'default'] if non_local_ips and default_routes and itf.dns_addresses: return True return False def process_generic(self, cmd_output: str) -> JSON: return json.loads(cmd_output) def query_iproute2(self) -> JSON: data: JSON = None try: output: str = subprocess.check_output(['ip', '-d', '-j', 'addr'], text=True) data = self.process_generic(output) except Exception as e: logging.critical('Cannot query iproute2 interface data: {}'.format(str(e))) return data def process_networkd(self, cmd_output) -> JSON: return json.loads(cmd_output)['Interfaces'] def query_networkd(self) -> JSON: data: JSON = None try: output: str = subprocess.check_output(['networkctl', '--json=short'], text=True) data = self.process_networkd(output) except Exception as e: logging.critical('Cannot query networkd interface data: {}'.format(str(e))) return data def process_nm(self, cmd_output) -> JSON: data: JSON = [] for line in cmd_output.splitlines(): split = line.split(':') dev = split[0] if split[0] else None if dev: # ignore inactive connection profiles data.append({ 'device': dev, 'name': split[1], 'uuid': split[2], 'filename': split[3], 'type': split[4], 'autoconnect': split[5], }) return data def query_nm(self) -> JSON: data: JSON = None try: output: str = utils.nmcli_out(['-t', '-f', 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', 'con', 'show']) data = self.process_nm(output) except Exception as e: logging.debug('Cannot query NetworkManager interface data: {}'.format(str(e))) return data def query_routes(self) -> tuple: data4 = None data6 = None try: output4: str = subprocess.check_output(['ip', '-d', '-j', 'route'], text=True) data4: JSON = self.process_generic(output4) output6: str = subprocess.check_output(['ip', '-d', '-j', '-6', 'route'], text=True) data6: JSON = self.process_generic(output6) except Exception as e: logging.debug('Cannot query iproute2 route data: {}'.format(str(e))) return (data4, data6) def query_resolved(self) -> tuple: addresses = None search = None try: ipc = dbus.SystemBus() resolve1 = ipc.get_object('org.freedesktop.resolve1', '/org/freedesktop/resolve1') resolve1_if = dbus.Interface(resolve1, 'org.freedesktop.DBus.Properties') res = resolve1_if.GetAll('org.freedesktop.resolve1.Manager') addresses = res['DNS'] search = res['Domains'] except Exception as e: logging.debug('Cannot query resolved DNS data: {}'.format(str(e))) return (addresses, search) def plain_print(self, *args, **kwargs): if len(args): lst = list(args) for tag in MATCH_TAGS.findall(lst[0]): # remove matching opening and closing tag lst[0] = lst[0].replace('[{}]'.format(tag), '')\ .replace('[/{}]'.format(tag), '') return print(*lst, **kwargs) return print(*args, **kwargs) def pretty_print(self, data: JSON, total: int, _console_width=None) -> None: if RICH_OUTPUT: # TODO: Use a proper (subiquity?) color palette theme = Theme({ 'netplan.int': 'bold cyan', 'netplan.str': 'yellow', 'muted': 'grey62', 'online': 'green bold', 'offline': 'red bold', 'unknown': 'yellow bold', 'highlight': 'bold' }) console = Console(highlighter=NetplanHighlighter(), theme=theme, width=_console_width, emoji=False) pprint = console.print else: pprint = self.plain_print pad = '18' global_state = data.get('netplan-global-state', {}) interfaces = [(key, data[key]) for key in data if key != 'netplan-global-state'] # Global state pprint(('{title:>'+pad+'} {value}').format( title='Online state:', value='[online]online[/online]' if global_state.get('online', False) else '[offline]offline[/offline]', )) ns = global_state.get('nameservers', {}) dns_addr: list = ns.get('addresses', []) dns_mode: str = ns.get('mode') dns_search: list = ns.get('search', []) if dns_addr: for i, val in enumerate(dns_addr): pprint(('{title:>'+pad+'} {value}[muted]{mode}[/muted]').format( title='DNS Addresses:' if i == 0 else '', value=val, mode=' ({})'.format(dns_mode) if dns_mode else '', )) if dns_search: for i, val in enumerate(dns_search): pprint(('{title:>'+pad+'} {value}').format( title='DNS Search:' if i == 0 else '', value=val, )) pprint() # Per interface for (ifname, data) in interfaces: state = data.get('operstate', 'UNKNOWN') + '/' + data.get('adminstate', 'UNKNOWN') scolor = 'unknown' if state == 'UP/UP': state = 'UP' scolor = 'online' elif state == 'DOWN/DOWN': state = 'DOWN' scolor = 'offline' full_type = data.get('type', 'other') ssid = data.get('ssid') tunnel_mode = data.get('tunnel_mode') if full_type == 'wifi' and ssid: full_type += ('/"' + ssid + '"') elif full_type == 'tunnel' and tunnel_mode: full_type += ('/' + tunnel_mode) pprint('[{col}]●[/{col}] {idx:>2}: {name} {type} [{col}]{state}[/{col}] ({backend}{netdef})'.format( col=scolor, idx=data.get('index', '?'), name=ifname, type=full_type, state=state, backend=data.get('backend', 'unmanaged'), netdef=': [highlight]{}[/highlight]'.format(data.get('id')) if data.get('id') else '' )) if data.get('macaddress'): pprint(('{title:>'+pad+'} {mac}[muted]{vendor}[/muted]').format( title='MAC Address:', mac=data.get('macaddress', ''), vendor=' ({})'.format(data.get('vendor', '')) if data.get('vendor') else '', )) lst: list = data.get('addresses', []) if lst: for i, obj in enumerate(lst): ip, extra = list(obj.items())[0] # get first (any only) address flags = [] if extra.get('flags'): # flags flags = extra.get('flags', []) highlight_start = '' highlight_end = '' if not flags or 'dhcp' in flags: highlight_start = '[highlight]' highlight_end = '[/highlight]' pprint(('{title:>'+pad+'} {start}{ip}/{prefix}{end}[muted]{extra}[/muted]').format( title='Addresses:' if i == 0 else '', ip=ip, prefix=extra.get('prefix', ''), extra=' ('+', '.join(flags)+')' if flags else '', start=highlight_start, end=highlight_end, )) lst = data.get('dns_addresses', []) if lst: for i, val in enumerate(lst): pprint(('{title:>'+pad+'} {value}').format( title='DNS Addresses:' if i == 0 else '', value=val, )) lst = data.get('dns_search', []) if lst: for i, val in enumerate(lst): pprint(('{title:>'+pad+'} {value}').format( title='DNS Search:' if i == 0 else '', value=val, )) lst = data.get('routes', []) if lst: for i, obj in enumerate(lst): default_start = '' default_end = '' if obj['to'] == 'default': default_start = '[highlight]' default_end = '[/highlight]' via = '' if 'via' in obj: via = ' via ' + obj['via'] src = '' if 'from' in obj: src = ' from ' + obj['from'] metric = '' if 'metric' in obj: metric = ' metric ' + str(obj['metric']) extra = [] if 'protocol' in obj and obj['protocol'] != 'kernel': proto = obj['protocol'] extra.append(proto) if 'scope' in obj and obj['scope'] != 'global': scope = obj['scope'] extra.append(scope) if 'type' in obj and obj['type'] != 'unicast': type = obj['type'] extra.append(type) pprint(('{title:>'+pad+'} {start}{to}{via}{src}{metric}{end}[muted]{extra}[/muted]').format( title='Routes:' if i == 0 else '', to=obj['to'], via=via, src=src, metric=metric, extra=' ('+', '.join(extra)+')' if extra else '', start=default_start, end=default_end)) val = data.get('activation_mode') if val: pprint(('{title:>'+pad+'} {value}').format( title='Activation Mode:', value=val, )) pprint() hidden = total - len(interfaces) if (hidden > 0): pprint('{} inactive interfaces hidden. Use "--all" to show all.'.format(hidden)) def command(self): # Make sure sd-networkd is running, as we need the data it provides. if not utils.systemctl_is_active('systemd-networkd.service'): if utils.systemctl_is_masked('systemd-networkd.service'): logging.error('\'netplan status\' depends on networkd, ' 'but systemd-networkd.service is masked. ' 'Please start it.') sys.exit(1) logging.debug('systemd-networkd.service is not active. Starting...') utils.systemctl('start', ['systemd-networkd.service'], True) # required data: iproute2 and sd-networkd can be expected to exist, # due to hard package dependencies iproute2 = self.query_iproute2() networkd = self.query_networkd() if not iproute2 or not networkd: logging.error('Could not query iproute2 or systemd-networkd') sys.exit(1) # optional data nmcli = self.query_nm() route4, route6 = self.query_routes() dns_addresses, dns_search = self.query_resolved() interfaces = [Interface(itf, networkd, nmcli, (dns_addresses, dns_search), (route4, route6)) for itf in iproute2] total = len(interfaces) # show only active interfaces by default filtered = [itf for itf in interfaces if itf.operstate != 'DOWN'] # down interfaces do not contribute anything to the online state online_state = self.query_online_state(filtered) # show only a single interface, if requested # XXX: bash completion (for interfaces names) if self.ifname: filtered = [next((itf for itf in interfaces if itf.name == self.ifname), None)] filtered = [elem for elem in filtered if elem is not None] if self.ifname and filtered == []: logging.error('Could not find interface {}'.format(self.ifname)) sys.exit(1) # Global state data = { 'netplan-global-state': { 'online': online_state, 'nameservers': self.resolvconf_json() } } # Per interface itf_iter = interfaces if self.all else filtered for itf in itf_iter: ifname, obj = itf.json() data[ifname] = obj # Output data in requested format output_format = self.format.lower() if output_format == 'json': # structural JSON output print(json.dumps(data, indent=None)) elif output_format == 'yaml': # stuctural YAML output print(yaml.dump(data, default_flow_style=False)) else: # pretty print, human readable output self.pretty_print(data, total) PK ! I��O �O netplan/cli/commands/migrate.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2018 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. '''netplan migrate command line''' import logging import os import sys import re from glob import glob try: import yaml NO_YAML = False except ImportError: # pragma: nocover NO_YAML = True from collections import OrderedDict import ipaddress import netplan.cli.utils as utils class NetplanMigrate(utils.NetplanCommand): def __init__(self): super().__init__(command_id='migrate', description='Migration of /etc/network/interfaces to netplan', leaf=True, testing=True) def parse_dns_options(self, if_options, if_config): """Parse dns options (dns-nameservers and dns-search) from if_options (an interface options dict) into the interface configuration if_config Mutates the arguments in place. """ if 'dns-nameservers' in if_options: if 'nameservers' not in if_config: if_config['nameservers'] = {} if 'addresses' not in if_config['nameservers']: if_config['nameservers']['addresses'] = [] for ns in if_options['dns-nameservers'].split(' '): # allow multiple spaces in the dns-nameservers entry if not ns: continue # validate? if_config['nameservers']['addresses'] += [ns] del if_options['dns-nameservers'] if 'dns-search' in if_options: if 'nameservers' not in if_config: if_config['nameservers'] = {} if 'search' not in if_config['nameservers']: if_config['nameservers']['search'] = [] for domain in if_options['dns-search'].split(' '): # allow multiple spaces in the dns-search entry if not domain: continue if_config['nameservers']['search'] += [domain] del if_options['dns-search'] def parse_mtu(self, iface, if_options, if_config): """Parse out the MTU. Operates the same way as parse_dns_options iface is the name of the interface, used only to print error messages """ if 'mtu' in if_options: try: mtu = int(if_options['mtu']) except ValueError: logging.error('%s: cannot parse "%s" as an MTU', iface, if_options['mtu']) sys.exit(2) if 'mtu' in if_config and not if_config['mtu'] == mtu: logging.error('%s: tried to set MTU=%d, but already have MTU=%d', iface, mtu, if_config['mtu']) sys.exit(2) if_config['mtu'] = mtu del if_options['mtu'] def parse_hwaddress(self, iface, if_options, if_config): """Parse out the manually configured MAC. Operates the same way as parse_dns_options iface is the name of the interface, used only to print error messages """ if 'hwaddress' in if_options: if 'macaddress' in if_config and not if_config['macaddress'] == if_options['hwaddress']: logging.error('%s: tried to set MAC %s, but already have MAC %s', iface, if_options['hwaddress'], if_config['macaddress']) sys.exit(2) if_config['macaddress'] = if_options['hwaddress'] del if_options['hwaddress'] def run(self): self.parser.add_argument('--root-dir', help='Search for and generate configuration files in this root directory instead of /') self.parser.add_argument('--dry-run', action='store_true', help='Print converted netplan configuration to stdout instead of writing/changing files') self.func = self.command_migrate self.parse_args() if NO_YAML: # pragma: nocover logging.error("""The `yaml` Python package couldn't be imported, and is needed for the migrate command. To install it on Debian or Ubuntu-based system, run `apt install python3-yaml`""") sys.exit(1) self.run_command() def command_migrate(self): netplan_config = {} try: ifaces, auto_ifaces = self.parse_ifupdown(self.root_dir or '') except ValueError as e: logging.error(str(e)) sys.exit(2) for iface, family_config in ifaces.items(): for family, config in family_config.items(): logging.debug('Converting %s family %s %s', iface, family, config) if iface not in auto_ifaces: logging.error('%s: non-automatic interfaces are not supported', iface) sys.exit(2) if config['method'] == 'loopback': # both systemd and modern ifupdown set up lo automatically logging.debug('Ignoring loopback interface %s', iface) elif config['method'] == 'dhcp': c = netplan_config.setdefault('network', {}).setdefault('ethernets', {}).setdefault(iface, {}) self.parse_dns_options(config['options'], c) self.parse_hwaddress(iface, config['options'], c) if config['options']: logging.error('%s: option(s) %s are not supported for dhcp method', iface, ", ".join(config['options'].keys())) sys.exit(2) if family == 'inet': c['dhcp4'] = True else: assert family == 'inet6' c['dhcp6'] = True elif config['method'] == 'static': c = netplan_config.setdefault('network', {}).setdefault('ethernets', {}).setdefault(iface, {}) if 'addresses' not in c: c['addresses'] = [] self.parse_dns_options(config['options'], c) self.parse_mtu(iface, config['options'], c) self.parse_hwaddress(iface, config['options'], c) # ipv4 if family == 'inet': # Already handled: mtu, hwaddress # Supported: address netmask gateway # Not supported yet: metric(?) # No YAML support: pointopoint scope broadcast supported_opts = set(['address', 'netmask', 'gateway']) unsupported_opts = set(['broadcast', 'metric', 'pointopoint', 'scope']) opts = set(config['options'].keys()) bad_opts = opts - supported_opts if bad_opts: for unsupported in bad_opts.intersection(unsupported_opts): logging.error('%s: unsupported %s option "%s"', iface, family, unsupported) sys.exit(2) for unknown in bad_opts - unsupported_opts: logging.error('%s: unknown %s option "%s"', iface, family, unknown) sys.exit(2) # the address may contain a /prefix suffix, or # the netmask property may be used. It's not clear # what happens if both are supplied. if 'address' not in config['options']: logging.error('%s: no address supplied in static method', iface) sys.exit(2) if '/' in config['options']['address']: addr_spec = config['options']['address'].split('/')[0] net_spec = config['options']['address'] else: if 'netmask' not in config['options']: logging.error('%s: address does not specify prefix length, and netmask not specified', iface) sys.exit(2) addr_spec = config['options']['address'] net_spec = config['options']['address'] + '/' + config['options']['netmask'] try: ipaddr = ipaddress.IPv4Address(addr_spec) except ipaddress.AddressValueError as a: logging.error('%s: error parsing "%s" as an IPv4 address: %s', iface, addr_spec, a) sys.exit(2) try: ipnet = ipaddress.IPv4Network(net_spec, strict=False) except ipaddress.NetmaskValueError as a: logging.error('%s: error parsing "%s" as an IPv4 network: %s', iface, net_spec, a) sys.exit(2) c['addresses'] += [str(ipaddr) + '/' + str(ipnet.prefixlen)] if 'gateway' in config['options']: # validate? c['gateway4'] = config['options']['gateway'] # ipv6 else: assert family == 'inet6' # Already handled: mtu, hwaddress # supported: address netmask gateway # partially supported: accept_ra (0/1 supported, 2 has no YAML rep) # unsupported: metric(?) # no YAML representation: media autoconf privext scope # preferred-lifetime dad-attempts dad-interval supported_opts = set(['address', 'netmask', 'gateway', 'accept_ra']) unsupported_opts = set(['metric', 'media', 'autoconf', 'privext', 'scope', 'preferred-lifetime', 'dad-attempts', 'dad-interval']) opts = set(config['options'].keys()) bad_opts = opts - supported_opts if bad_opts: for unsupported in bad_opts.intersection(unsupported_opts): logging.error('%s: unsupported %s option "%s"', iface, family, unsupported) sys.exit(2) for unknown in bad_opts - unsupported_opts: logging.error('%s: unknown %s option "%s"', iface, family, unknown) sys.exit(2) # the address may contain a /prefix suffix, or # the netmask property may be used. It's not clear # what happens if both are supplied. if 'address' not in config['options']: logging.error('%s: no address supplied in static method', iface) sys.exit(2) if '/' in config['options']['address']: addr_spec = config['options']['address'].split('/')[0] net_spec = config['options']['address'] else: if 'netmask' not in config['options']: logging.error('%s: address does not specify prefix length, and netmask not specified', iface) sys.exit(2) addr_spec = config['options']['address'] net_spec = config['options']['address'] + '/' + config['options']['netmask'] try: ipaddr = ipaddress.IPv6Address(addr_spec) except ipaddress.AddressValueError as a: logging.error('%s: error parsing "%s" as an IPv6 address: %s', iface, addr_spec, a) sys.exit(2) try: ipnet = ipaddress.IPv6Network(net_spec, strict=False) except ipaddress.NetmaskValueError as a: logging.error('%s: error parsing "%s" as an IPv6 network: %s', iface, net_spec, a) sys.exit(2) c['addresses'] += [str(ipaddr) + '/' + str(ipnet.prefixlen)] if 'gateway' in config['options']: # validate? c['gateway6'] = config['options']['gateway'] if 'accept_ra' in config['options']: if config['options']['accept_ra'] == '0': c['accept_ra'] = False elif config['options']['accept_ra'] == '1': c['accept_ra'] = True elif config['options']['accept_ra'] == '2': logging.error('%s: netplan does not support accept_ra=2', iface) sys.exit(2) else: logging.error('%s: unexpected accept_ra value "%s"', iface, config['options']['accept_ra']) sys.exit(2) else: # pragma nocover # this should be unreachable logging.error('%s: method %s is not supported', iface, config['method']) sys.exit(2) if_config = os.path.join(self.root_dir or '/', 'etc/network/interfaces') if netplan_config: netplan_config['network']['version'] = 2 netplan_yaml = yaml.dump(netplan_config) if self.dry_run: print(netplan_yaml) else: dest = os.path.join(self.root_dir or '/', 'etc/netplan/10-ifupdown.yaml') try: os.makedirs(os.path.dirname(dest)) except FileExistsError: pass try: with open(dest, 'x') as f: f.write(netplan_yaml) except FileExistsError: logging.error('%s already exists; remove it if you want to run the migration again', dest) sys.exit(3) logging.info('migration complete, wrote %s', dest) else: logging.warning('ifupdown does not configure any interfaces, nothing to migrate') if not self.dry_run: logging.info('renaming %s to %s.netplan-converted', if_config, if_config) os.rename(if_config, if_config + '.netplan-converted') def _ifupdown_lines_from_file(self, rootdir, path): '''Return normalized lines from ifupdown config This resolves "source" and "source-directory" includes. ''' def expand_source_arg(rootdir, curdir, line): arg = line.split()[1] if arg.startswith('/'): return rootdir + arg else: return curdir + '/' + arg lines = [] rootdir_len = len(rootdir) + 1 try: with open(rootdir + '/' + path) as f: logging.debug('reading %s', f.name) for line in f: # normalize, strip empty lines and comments line = line.strip() if not line or line.startswith('#'): continue if line.startswith('source-directory '): valid_re = re.compile('^[a-zA-Z0-9_-]+$') d = expand_source_arg(rootdir, os.path.dirname(f.name), line) for f in os.listdir(d): if valid_re.match(f): lines += self._ifupdown_lines_from_file(rootdir, os.path.join(d[rootdir_len:], f)) elif line.startswith('source '): for f in glob(expand_source_arg(rootdir, os.path.dirname(f.name), line)): lines += self._ifupdown_lines_from_file(rootdir, f[rootdir_len:]) else: lines.append(line) except FileNotFoundError: logging.debug('%s/%s does not exist, ignoring', rootdir, path) return lines def parse_ifupdown(self, rootdir='/'): '''Parse ifupdown configuration. Return (iface_name → family → {method, options}, auto_ifaces: set) tuple on successful parsing, or a ValueError when encountering an invalid file or ifupdown features which are not supported (such as "mapping"). options is itself a dictionary option_name → value. ''' # expected number of fields for every possible keyword, excluding the keyword itself fieldlen = {'auto': 1, 'allow-auto': 1, 'allow-hotplug': 1, 'mapping': 1, 'no-scripts': 1, 'iface': 3} # read and normalize all lines from config, with resolving includes lines = self._ifupdown_lines_from_file(rootdir, '/etc/network/interfaces') ifaces = OrderedDict() auto = set() in_options = None # interface name if parsing options lines after iface stanza in_family = None # we now have resolved all includes and normalized lines for line in lines: fields = line.split() try: # does the line start with a known stanza field? exp_len = fieldlen[fields[0]] logging.debug('line fields %s (expected length: %i)', fields, exp_len) in_options = None # stop option line parsing of iface stanza in_family = None except KeyError: # no known stanza field, are we in an iface stanza and parsing options? if in_options: logging.debug('in_options %s, parsing as option: %s', in_options, line) ifaces[in_options][in_family]['options'][fields[0]] = line.split(maxsplit=1)[1] continue else: raise ValueError('Unknown stanza type %s' % fields[0]) # do we have the expected #parameters? if len(fields) != exp_len + 1: raise ValueError('Expected %i fields for stanza type %s but got %i' % (exp_len, fields[0], len(fields) - 1)) # we have a valid stanza line now, handle them if fields[0] in ('auto', 'allow-auto', 'allow-hotplug'): auto.add(fields[1]) elif fields[0] == 'mapping': raise ValueError('mapping stanza is not supported') elif fields[0] == 'no-scripts': pass # ignore these elif fields[0] == 'iface': if fields[2] not in ('inet', 'inet6'): raise ValueError('Unknown address family %s' % fields[2]) if fields[3] not in ('loopback', 'static', 'dhcp'): raise ValueError('Unsupported method %s' % fields[3]) in_options = fields[1] in_family = fields[2] ifaces.setdefault(fields[1], OrderedDict())[in_family] = {'method': fields[3], 'options': {}} else: raise NotImplementedError('stanza type %s is not implemented' % fields[0]) # pragma nocover logging.debug('final parsed interfaces: %s; auto ifaces: %s', ifaces, auto) return (ifaces, auto) PK ! Fﱗ� � netplan/cli/commands/__init__.pynu �[��� #!/usr/bin/python3 # # Copyright (C) 2018 Canonical, Ltd. # Author: Mathieu Trudel-Lapierre <mathieu.trudel-lapierre@canonical.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 3. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from netplan.cli.commands.apply import NetplanApply from netplan.cli.commands.generate import NetplanGenerate from netplan.cli.commands.ip import NetplanIp from netplan.cli.commands.migrate import NetplanMigrate from netplan.cli.commands.try_command import NetplanTry from netplan.cli.commands.info import NetplanInfo from netplan.cli.commands.set import NetplanSet from netplan.cli.commands.get import NetplanGet from netplan.cli.commands.sriov_rebind import NetplanSriovRebind from netplan.cli.commands.status import NetplanStatus __all__ = [ 'NetplanApply', 'NetplanGenerate', 'NetplanIp', 'NetplanMigrate', 'NetplanTry', 'NetplanInfo', 'NetplanSet', 'NetplanGet', 'NetplanSriovRebind', 'NetplanStatus', ] PK ! ��6 &