#! /usr/bin/env python
#
# Copyright (C) 2013-2016, 2018, 2019, University of Zurich.
# Copyright (C) 2020 ETH Zurich.
# Copyright (C) 2022 Google LLC.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Turn ElastiCluster configuration into internal data structures.
Digesting configuration files into data structures ready to be processed by the
rest of ElastiCluster happens in three stages:
1. Read configuration files and create a (nested) key/value store of all the
configuration items.
2. Arrange the configuration items into sets of properties that are needed to
create ElastiCluster objects (clusters, cloud providers, etc.) -- the
outcome of this phase would be a set of dictionaries that can be fed as
`**kwargs` to class constructors.
3. Instanciate the actual working objects.
"""
from __future__ import (print_function, division, absolute_import)
# compatibility imports
from future import standard_library
standard_library.install_aliases()
# stdlib imports
from builtins import zip
from builtins import object
from collections import defaultdict
from importlib import import_module
import os
from os.path import expanduser, expandvars
import re
import sys
from urllib.parse import urlparse
from warnings import warn
if sys.version_info[0] == 2:
from ConfigParser import SafeConfigParser
def make_config_parser():
return SafeConfigParser()
else:
# `SafeConfigParser` was deprecated in Py3 in favor of `ConfigParser`
from configparser import ConfigParser
def make_config_parser():
return ConfigParser(strict=False)
# 3rd-party modules
from pkg_resources import resource_filename
from schema import Schema, SchemaError, Optional, Or, Regex
# ElastiCluster imports
from elasticluster import log
from elasticluster.exceptions import ConfigurationError
from elasticluster.providers.ansible_provider import AnsibleSetupProvider
from elasticluster.cluster import Cluster, NodeNamingPolicy
from elasticluster.repository import MultiDiskRepository
from elasticluster.utils import environment
from elasticluster.validate import (
alert,
boolean,
executable_file,
existing_file,
hostname,
nonempty_str,
nonnegative_int,
nova_api_version,
positive_int,
readable_file,
url,
)
## defaults and built-in config
KEY_RENAMES = [
# pylint: disable=bad-whitespace,bad-continuation
# section from key to key verbose? supported until...
('cluster', 'setup_provider', 'setup', True, '2.0'),
('cloud', 'tenant_name', 'project_name', True, '2.0'),
# working on issue #279 uncovered a conflict between code and
# docs: the documentation referred to config keys
# `<class>_min_nodes` but the code actually looked for
# `<class>_nodes_min`. Keep this last version as it makes the
# code simpler, but alert users of the change...
('cluster', re.compile(r'([0-9a-z_-]+)_min_nodes'),
r'\1_nodes_min', True, '2.0'),
('setup', 'ssh_pipelining', 'ansible_ssh_pipelining',
True, '1.4'),
]
SCHEMA = {
'cloud': {
'provider': Or('azure', 'ec2_boto', 'google', 'opennebula', 'openstack', 'libcloud'),
# allow other keys w/out restrictions; each cloud provider has its own
# set of keys, which are handled separately
Optional(str): str,
},
'cluster': {
'cloud': str,
'setup': str,
'login': str,
'nodes': {
str: {
'flavor': nonempty_str,
'image_id': nonempty_str,
Optional('image_userdata', default=''): str,
Optional('security_group', default='default'): str, ## FIXME: alphanumeric?
Optional('network_ids'): str,
# these are auto-generated but already there by the time
# validation is run
'login': nonempty_str,
'num': positive_int,
'min_num': nonnegative_int,
# only on Azure
Optional("storage_account_type", default='Standard_LRS'): Or('Standard_LRS', 'Premium_LRS', 'StandardSSD_LRS', 'UltraSSD_LRS'),
# only on Google Cloud
Optional("accelerator_count", default=0): nonnegative_int,
Optional("accelerator_type"): nonempty_str,
Optional("local_ssd_count", default=0): nonnegative_int,
Optional("local_ssd_interface", default='SCSI'): Or('NVME', 'SCSI'),
Optional("min_cpu_platform"): nonempty_str,
# only on OpenStack
Optional('floating_network_id'): str,
Optional("request_floating_ip"): boolean,
# allow other string keys w/out restrictions
Optional(str): str,
},
},
Optional("ssh_probe_timeout", default=5): positive_int,
Optional("ssh_proxy_command", default=''): str,
Optional("start_timeout", default=600): positive_int,
# only on Azure
Optional("storage_account_type", default='Standard_LRS'): Or('Standard_LRS', 'Premium_LRS', 'StandardSSD_LRS', 'UltraSSD_LRS'),
# only on Google Cloud
Optional("accelerator_count", default=0): nonnegative_int,
Optional("accelerator_type"): nonempty_str,
Optional("allow_project_ssh_keys", default=True): boolean,
Optional("local_ssd_count", default=0): nonnegative_int,
Optional("local_ssd_interface", default='SCSI'): Or('NVME', 'SCSI'),
Optional("min_cpu_platform"): nonempty_str,
# only on OpenStack
Optional('floating_network_id'): str,
Optional("request_floating_ip"): boolean,
# allow other string keys w/out restrictions
Optional(str): str,
},
'login': {
'image_user': nonempty_str,
Optional('image_sudo', default=True): boolean,
Optional('image_user_sudo', default="root"): nonempty_str,
Optional('image_userdata', default=''): str,
'user_key_name': str, # FIXME: are there restrictions? (e.g., alphanumeric)
'user_key_private': readable_file,
'user_key_public': readable_file,
},
'setup': {
Optional('provider', default='ansible'): str,
Optional("playbook_path",
default=os.path.join(
resource_filename('elasticluster', 'share/playbooks'),
'main.yml')): readable_file,
Optional("ansible_command"): executable_file,
Optional("ansible_extra_args"): str,
Optional("safe_but_slower", default=False): boolean,
# allow other keys w/out restrictions
str: str,
},
'storage': {
Optional('storage_path', default=os.path.expanduser("~/.elasticluster/storage")): str,
Optional('storage_type'): Or('yaml', 'json', 'pickle'),
},
}
CLOUD_PROVIDER_SCHEMAS = {
'azure': {
"provider": 'azure',
Optional("subscription_id", default=os.getenv('AZURE_SUBSCRIPTION_ID', '')): nonempty_str,
Optional("tenant_id", default=os.getenv('AZURE_TENANT_ID', '')): nonempty_str,
Optional("client_id", default=os.getenv('AZURE_CLIENT_ID', '')): nonempty_str,
Optional("secret", default=os.getenv('AZURE_CLIENT_SECRET', '')): nonempty_str,
Optional("location", default="westus"): nonempty_str,
Optional("vm_deployment_template", default=None): existing_file,
Optional("net_deployment_template", default=None): existing_file,
Optional("certificate"): alert(
"The `certificate` setting is no longer valid"
" in the Azure configuration."
" Please remove it from your configuration file."),
Optional("wait_timeout"): alert(
"The `wait_timeout` setting is no longer valid"
" in the Azure configuration."
" Please remove it from your configuration file."),
},
'ec2_boto': {
"provider": 'ec2_boto',
"ec2_url": url,
Optional("ec2_access_key", default=os.getenv('EC2_ACCESS_KEY', '')): nonempty_str,
Optional("ec2_secret_key", default=os.getenv('EC2_SECRET_KEY', '')): nonempty_str,
"ec2_region": nonempty_str,
Optional("request_floating_ip", default=False): boolean,
Optional("vpc"): nonempty_str,
Optional("price", default=0): nonnegative_int,
Optional("timeout", default=0): nonnegative_int,
Optional("instance_profile"): nonempty_str,
},
'google': {
"provider": 'google',
"gce_project_id": nonempty_str,
Optional("gce_client_id"): nonempty_str,
Optional("gce_client_secret"): nonempty_str,
Optional("network", default="default"): nonempty_str,
Optional("noauth_local_webserver"): boolean,
Optional("zone", default="us-central1-a"): nonempty_str,
},
'opennebula': {
"provider": 'opennebula',
Optional("endpoint", default=os.getenv('ONE_URL', 'http://localhost:2633/RPC2')): url,
Optional("username", default=os.getenv('ONE_USERNAME', '')): nonempty_str,
Optional("password", default=os.getenv('ONE_PASSWORD', '')): nonempty_str,
},
'openstack': {
"provider": 'openstack',
Optional("auth_url"): url,
Optional("cacert"): existing_file,
Optional("username"): nonempty_str,
Optional("password"): nonempty_str,
Optional("user_domain_name"): nonempty_str,
Optional("project_domain_name"): nonempty_str,
Optional("project_name"): nonempty_str,
Optional("request_floating_ip"): boolean, ## DEPRECATED, place in cluster or node config
Optional("region_name"): nonempty_str,
Optional("availability_zone"): nonempty_str,
Optional("compute_api_version"): Or('1.1', '2'),
Optional("image_api_version"): Or('1', '2'),
Optional("network_api_version"): Or('2.0'),
Optional("volume_api_version"): Or('3'), # v2 and older are deprecated and python-cinderclient will error out
Optional("identity_api_version"): Or('3', '2'), # no default, can auto-detect
Optional("use_anti_affinity_groups"): boolean,
Optional("nova_api_version"): nova_api_version, ## DEPRECATED, use `compute_api_version` instead
Optional("build_timeout", default=30): nonnegative_int,
},
'libcloud': {
"provider": 'libcloud',
'driver_name': nonempty_str,
Optional(str): str,
}
}
CLOUD_PROVIDERS = {
# pylint: disable=bad-whitespace
'ec2_boto': ('elasticluster.providers.ec2_boto', 'BotoCloudProvider'),
'opennebula':('elasticluster.providers.opennebula', 'OpenNebulaCloudProvider'),
'openstack': ('elasticluster.providers.openstack', 'OpenStackCloudProvider'),
'google': ('elasticluster.providers.gce', 'GoogleCloudProvider'),
'azure': ('elasticluster.providers.azure_provider', 'AzureCloudProvider'),
'libcloud': ('elasticluster.providers.libcloud_provider', 'LibCloudProvider'),
}
SETUP_PROVIDERS = {
# pylint: disable=bad-whitespace
"ansible": ('elasticluster.providers.ansible_provider', 'AnsibleSetupProvider'),
}
def _get_provider(name, provider_map):
"""
Return the constructor for provider `name` in mapping `provider_map`.
Second argument `provider_map` is a Python mapping that translates a
provider kind name (e.g., ``ec2``) into a pair *(module, class)*;
`_get_provider` will attempt to import the named module (using Python's
standard import mechanisms) and return the `class` attribute from that
module.
:raise KeyError: If the given `name` is not a valid key in `provider_map`
:raise ImportError: If the module corresponding to `name`
in `provider_map` cannot be loaded.
:raise AttributeError: If the class name corresponding to `name`
in `provider_map` does not exist in the module.
"""
modname, clsname = provider_map[name]
mod = import_module(modname)
cls = getattr(mod, clsname)
log.debug("Using class %r from module %r to instanciate provider '%s'",
cls, mod, name)
return cls
def _make_defaults_dict():
"""
Return mapping from names to be used in `%()s` expansion.
"""
env = {}
# default location of Ansible playbooks; make it also available as
# `%(elasticluster_playbooks)` so one can write `%(elasticluster_playbooks)s/main.yml`
env['ansible_pb_dir'] = env['elasticluster_playbooks'] \
= resource_filename('elasticluster', 'share/playbooks')
return env
## public API entry point
[docs]def make_creator(configfiles, storage_path=None):
"""
Return a `Creator` instance initialized from given configuration files.
:param list configfiles: list of paths to the INI-style file(s).
For each path ``P`` in `configfiles`, if a directory named ``P.d``
exists, also reads all the `*.conf` files in that directory.
:param str storage_path:
path to the storage directory. If defined, a
:py:class:`repository.DiskRepository` class will be instantiated.
:return: :py:class:`Creator`
"""
try:
# only strings have the `.swapcase()` method; lists and tuples don't
configfiles.swapcase # pylint: disable=pointless-statement
configfiles = [configfiles]
except AttributeError:
# `configfiles` is list or tuple
pass
# also look for ``path.d/*.conf`` files
configfiles = _expand_config_file_list(configfiles)
if not configfiles:
raise ValueError('Empty list of config files')
config = load_config_files(configfiles)
return Creator(config, storage_path=storage_path)
def _expand_config_file_list(paths, ignore_nonexistent=True,
expand_user_dir=True, expand_env_vars=False):
"""
Return list of (existing) configuration files.
The list of configuration file is built in the following way:
- any path pointing to an existing file is included in the result;
- for any path ``P``, if directory ``P.d`` exists, any file
contained in it and named ``*.conf`` is included in the
result;
- if argument `ignore_nonexistent` is ``True`` (default), then non-existing
paths are silently ignored and omitted from the returned result. Else, if
`ignore_nonexistent` is ``False``, a `ValueError` exception is raised.
If keyword arguments `expand_user_dir` and `expand_env_vars` are ``True``
(default), then each path is expanded with `os.path.expanduser` (resp.
`os.path.expandvars`).
"""
configfiles = set()
for path in paths:
if expand_user_dir:
path = os.path.expanduser(path)
if expand_env_vars:
path = os.path.expandvars(path)
if os.path.isfile(path):
configfiles.add(path)
elif not ignore_nonexistent:
raise ValueError(
"Configuration file `{0}` does not exist"
.format(path))
path_d = path + '.d'
if os.path.isdir(path_d):
for entry in os.listdir(path_d):
if entry.endswith('.conf'):
cfgfile = os.path.join(path_d, entry)
if cfgfile not in configfiles:
configfiles.add(cfgfile)
return list(configfiles)
## loading and parsing
# validation regexps
_CLUSTER_NAME_RE = re.compile('^[a-z0-9+_-]+$', re.I)
[docs]def load_config_files(paths):
"""
Read configuration file(s) and return corresponding data structure.
:param paths: list of file names to load.
"""
# I wish there were a "pipelinine" operator in Python, so I could rewrite
# this as `paths *into* raw_config *into* _arrange_config_tree ...`
raw_config = _read_config_files(paths)
tree_config1 = _arrange_config_tree(raw_config)
tree_config2 = _perform_key_renames(tree_config1)
complete_config = _build_node_section(tree_config2)
object_tree = _validate_and_convert(complete_config)
deref_config = _dereference_config_tree(object_tree)
final_config = _cross_validate_final_config(deref_config)
return final_config
def _read_config_files(paths):
"""
Read configuration data from INI-style file(s).
Data loaded from the given files is aggregated into a nested 2-level Python
mapping, where 1st-level keys are config section names (as read from the
files), and corresponding items are again key/value mappings (configuration
item name and value).
:param paths: list of filesystem paths of files to read
"""
# read given config files
configparser = make_config_parser()
# Preventing automatic lowercase of config keys
# see: https://stackoverflow.com/questions/19359556/configparser-reads-capital-keys-and-make-them-lower-case
configparser.optionxform = str
configparser.read(paths)
# temporarily modify environment to allow both `${...}` and `%(...)s`
# variable substitution in config values
defaults = _make_defaults_dict()
config = {}
with environment(**defaults):
for section in configparser.sections():
config[section] = {}
for key in configparser.options(section):
# `configparser.get()` performs the `%(...)s` substitutions
value = configparser.get(section, key, vars=defaults)
# `expandvars()` performs the `${...}` substitutions
config[section][key] = expandvars(value)
return config
def _arrange_config_tree(raw_config):
"""
Group configuration data by section type.
Given the 'raw configuration data' (as returned by
`_read_config_files`:func:), create and return a nested mapping:
* 1st-level keys are strings naming section types (i.e., ``'cluster'``,
``'cloud'``, ``'login'``, ``'setup'``);
* 2nd-level keys are then the names given to such sections. For example,
the contents of section ``[login/ubuntu]`` would be accessible from the
return value ``C`` as ``C['login']['ubuntu']``.
As an exception, subsections of a named cluster (e.g.,
``[cluster/gridengine/qmaster]``) will be inserted as items in the
``'nodes'`` key of the named cluster. For example, key/value pairs read
from section ``[cluster/gridengine/qmaster]`` will be accessible as
``C['cluster']['gridengine']['nodes']['qmaster']``.
"""
tree = {}
for sect_name, sect_items in raw_config.items():
# skip empty sections
if not sect_items:
continue
path = sect_name.split('/')
# translate `cluster/foo/bar` -> `cluster/foo/__nodes__/bar`
if path[0] == 'cluster' and len(path) > 2:
path.insert(2, 'nodes')
_update_nested_item(tree, path, sect_items)
return tree
def _update_nested_item(D, path, items):
"""
Walk nested mapping `D` and update the last key in `path`.
For example::
>>> D = {'b': {'a': {}}}
>>> updated = _update_nested_item(D, ['b', 'a'], {'x':1, 'y':2})
>>> D['b']['a'] == {'x':1, 'y':2}
True
The 'update' operation leaves key/value pairs which are not in `items`
unchanged::
>>> D = {'b': {'a': {'z': 3}}}
>>> updated = _update_nested_item(D, ['b', 'a'], {'x':1, 'y':2})
>>> D['b']['a'] == {'x':1, 'y':2, 'z':3}
True
In fact, `_update_nested_item` can also be used in the 'degenerate' cases
where `path` is 1 or 0 elements long, in which case it becomes essentially
a more verbose syntax for `dict.update`::
>>> D = {'a': {}}
>>> updated = _update_nested_item(D, ['a'], {'x':1, 'y':2})
>>> D['a'] == {'x':1, 'y':2}
True
>>> D = {'z': 3}
>>> updated = _update_nested_item(D, [], {'x':1, 'y':2})
>>> D == {'x':1, 'y':2, 'z':3}
True
Note that the nested dictionaries corresponding to the specified `path`
will be created if they do not already exist::
>>> D = {}
>>> updated = _update_nested_item(D, ['b', 'a'], {'x':1, 'y':2})
>>> D == {'b': {'a': {'x':1, 'y':2}}}
True
"""
target = D
while path:
key = path.pop(0)
if key not in target:
target[key] = {}
target = target[key]
target.update(items)
return target
# pylint: disable=dangerous-default-value
def _perform_key_renames(tree, changes=KEY_RENAMES):
"""
Change a configuration "tree" in-place, renaming legacy keys to new names.
This function chiefly supports two distinct uses:
- allow old/legacy option names configuration files, but still warn users
of the new/updated name;
- allow alternate options names to be used in the configuration file but
normalize them to a "canonical" spelling before the code sees them.
Second argument `changes` is a list of items. Each item is a tuple
describing a single key rename:
- 1st field names the section type (e.g., ``cluster``) where the key
renames are going to happen;
- 2nd field is the old/legacy key name (can be a regular expression);
- 3rd field is the new/updated key name (or the substitution pattern
if 2nd field is a regexp);
- 4th field is a boolean flag: if ``True``, a warning will be emitted
telling users that the configuration option has been renamed; make this
``False`` to just allow option key synonyms;
- 5th field is the ElastiCluster release until which the automatic rename
will be supported (only relevant if 4th field "verbose" is ``True``).
"""
for section, from_key, to_key, verbose, supported in changes:
if section not in tree:
# XXX: should this be a configuration error instead?
log.warning(
"No section `%s` found in configuration!"
" This will almost certainly end up causing an error later on.",
section)
continue
for stanza, pairs in tree[section].items():
# ensure we work on a copy of the keys collection,
# so we can mutate the tree down below
for key in list(pairs.keys()):
substitute = False
try:
# try regexp match
match = from_key.match(key)
if match:
to_key = from_key.sub(key, to_key)
substitute = True
except AttributeError:
# plain old string match
substitute = (key == from_key)
if substitute:
tree[section][stanza][to_key] = tree[section][stanza][from_key]
del tree[section][stanza][from_key]
if verbose:
warn("Configuration key `{from_key}`"
" in section `{section}/{stanza}`"
" should be renamed to `{to_key}`"
" -- please update configuration file(s)."
" Support for automatic renaming will be"
" removed in {version} of ElastiCluster."
.format(
from_key=from_key,
to_key=to_key,
section=section,
stanza=stanza,
version=(("release {0}".format(supported))
if supported
else "a future release")))
return tree
def _dereference_config_tree(tree, evict_on_error=True):
# FIXME: Should allow *three* distinct behaviors on error?
# - "evict on error": remove the offending section and continue
# - "raise exception": raise a ConfigurationError at the first error
# - "just report": log errors but try to return all that makes sense
"""
Modify `tree` in-place replacing cross-references by section name with the
actual section content.
For example, if a cluster section lists a key/value pair
``'login': 'ubuntu'``, this will be replaced with ``'login': { ... }``.
"""
to_evict = []
for cluster_name, cluster_conf in tree['cluster'].items():
for key in ['cloud', 'login', 'setup']:
try:
refname = cluster_conf[key]
except KeyError:
log.error(
"Configuration section `cluster/%s`"
" is missing a `%s=` section reference."
" %s",
cluster_name, key,
("Dropping cluster definition." if evict_on_error else ""))
if evict_on_error:
to_evict.append(cluster_name)
break
else:
# cannot continue
raise ConfigurationError(
"Invalid cluster definition `cluster/{0}:"
" missing `{1}=` configuration key"
.format(cluster_name, key))
try:
# dereference
cluster_conf[key] = tree[key][refname]
except KeyError:
log.error(
"Configuration section `cluster/%s`"
" references non-existing %s section `%s`."
" %s",
cluster_name, key, refname,
("Dropping cluster definition." if evict_on_error else ""))
if evict_on_error:
to_evict.append(cluster_name)
break
for cluster_name in to_evict:
del tree['cluster'][cluster_name]
return tree
def _build_node_section(tree):
"""
Create or update nested mapping `nodes` into each cluster config.
Keys in the `nodes` mapping are node kind names (i.e., the first segment of
`*_nodes` configuration options), and corresponding values are
configuration key/value pairs that apply to nodes of that kind.
See also function `_gather_node_kind_info`:func: for more details on how
the kind-level configuration is built.
"""
for cluster_name, cluster_conf in tree['cluster'].items():
node_kind_config = dict((key, value)
for key, value in cluster_conf.items()
if key.endswith('_nodes'))
if 'nodes' not in cluster_conf:
cluster_conf['nodes'] = {}
for key in node_kind_config.keys():
kind_name = key[:-len('_nodes')]
# nodes can inherit the properties of cluster or overwrite them
kind_values = _gather_node_kind_info(kind_name, cluster_name, cluster_conf)
cluster_conf['nodes'][kind_name] = kind_values
return tree
def _gather_node_kind_info(kind_name, cluster_name, cluster_conf):
"""
Collect key/value configuration for nodes of a given kind.
Return a mapping of key/value configuration options; the mapping is
constructed by layering key/value pairs from two sources:
1. Cluster-level options;
2. Kind-specific attributes, as set in the ``[cluster/name/kind]`` sections.
Options from the latter override options set in the former.
"""
# copy cluster-level config
kind_values = {}
for attr in (
'flavor',
'image_id',
#'image_user', ## from `login/*`
'image_userdata',
'login',
'network_ids',
'security_group',
'node_name',
'ssh_proxy_command',
# Azure only
'storage_account_type',
# Google Cloud only
'accelerator_count',
'accelerator_type',
'allow_project_ssh_keys',
'boot_disk_size',
'boot_disk_type',
'min_cpu_platform',
'scheduling',
'tags',
# OpenStack only
'floating_network_id',
'request_floating_ip',
#'user_key_name', ## from `login/*`
#'user_key_private', ## from `login/*`
#'user_key_public', ## from `login/*`
):
if attr in cluster_conf:
kind_values[attr] = cluster_conf[attr]
# override with node-specific attrs (if given)
if kind_name in cluster_conf['nodes']:
for key, value in cluster_conf['nodes'][kind_name].items():
kind_values[key] = value
kind_values['num'], kind_values['min_num'] = \
_compute_desired_and_minimum_number_of_nodes(kind_name, cluster_name, cluster_conf)
return kind_values
# pylint: disable=invalid-name
def _compute_desired_and_minimum_number_of_nodes(kind_name, cluster_name, cluster_conf):
"""
Compute desired and minimum number of nodes of the given kind.
"""
num = int(cluster_conf[kind_name + '_nodes'])
if (kind_name + '_nodes_min') not in cluster_conf:
min_num = num
else:
min_num = int(cluster_conf[kind_name + '_nodes_min'])
if min_num > num:
raise ValueError(
" In cluster `{cluster_name}`:"
" Minimum number of '{kind}' nodes ({min_num})"
" is larger then the number"
" of '{kind}' nodes to start ({num})"
.format(
cluster_name=cluster_name,
kind=kind_name,
min_num=min_num,
num=num
))
return num, min_num
## validation and conversion
def _validate_and_convert(cfgtree, evict_on_error=True):
objtree = {}
for section, model in SCHEMA.items():
if section not in cfgtree:
continue
stanzas = cfgtree[section]
objtree[section] = {}
for name, properties in stanzas.items():
log.debug("Checking section `%s/%s` ...", section, name)
try:
objtree[section][name] = Schema(model).validate(properties)
# further checks for cloud providers
if section == 'cloud':
objtree[section][name] = _validate_cloud_section(objtree[section][name])
# check node name pattern in clusters conforms to RFC952
if section == 'cluster':
_validate_node_group_names(objtree[section][name])
except (SchemaError, ValueError) as err:
log.error("In section `%s/%s`: %s", section, name, err)
if evict_on_error:
log.error(
"Dropping configuration section `%s/%s`"
" because of the above errors", section, name)
# `objtree[section][name]` exists if the except was raised
# by the second validation (line 650)
if name in objtree[section]:
del objtree[section][name]
return objtree
def _validate_cloud_section(cloud_section):
"""
Run provider-specific schema validation.
"""
provider = cloud_section['provider']
return Schema(
CLOUD_PROVIDER_SCHEMAS[provider]).validate(cloud_section)
def _validate_node_group_names(cluster_section):
"""
Check that node group names conform to RFC 952.
"""
for nodename in cluster_section['nodes']:
hostname(nodename) ## raises ValueError if not conformant
return cluster_section
def _cross_validate_final_config(objtree, evict_on_error=True):
"""
Run validation checks that require correlating values from different sections.
"""
# take a copy of cluster config as we might be modifying it
for name, cluster in list(objtree['cluster'].items()):
valid = True
# ensure all cluster node kinds are defined in the `setup/*` section
setup_sect = cluster['setup']
for groupname, properties in cluster['nodes'].items():
if (groupname + '_groups') not in setup_sect:
log.error("Cluster `%s` requires nodes of kind `%s`,"
" but no such group is defined"
" in the referenced setup section.",
name, groupname)
valid = False
break
# ensure `ssh_to` has a valid value
if 'ssh_to' in cluster:
ssh_to = cluster['ssh_to']
try:
# extract node kind if this is a node name (e.g., `master001` => `master`)
parts = NodeNamingPolicy.parse(ssh_to)
ssh_to = parts['kind']
except ValueError:
pass
if ssh_to not in cluster['nodes']:
log.error("Cluster `%s` is configured to SSH into nodes of kind `%s`,"
" but no such kind is defined.", name, ssh_to)
valid = False
# EC2-specific checks
if cluster['cloud']['provider'] == 'ec2_boto':
cluster_uses_vpc = ('vpc' in cluster['cloud'])
for groupname, properties in cluster['nodes'].items():
if cluster_uses_vpc and 'network_ids' not in properties:
log.error(
"Node group `%s/%s` is being used in a VPC,"
" so it must specify ``network_ids``.",
cluster, groupname)
if evict_on_error:
valid = False
break
if not cluster_uses_vpc and 'network_ids' in properties:
log.error(
"Cluster `%s` must specify a VPC"
" to place `%s` instances in network `%s`",
cluster, groupname, properties['network_ids'])
if evict_on_error:
valid = False
break
if not valid:
log.error("Dropping cluster `%s` because of the above errors", name)
del objtree['cluster'][name]
return objtree
## general factory
[docs]class Creator(object):
"""
The `Creator` class is responsible for:
1. keeping track of the configuration, and
2. offering factory methods to create all kind of objects
that need information from the configuration, and
3. loading a cluster from a valid `repository.AbstractClusterRepository`.
First argument cluster configuration is a nested Python mapping structured
in the following way::
'cluster': { ## this must be literally `cluster`
{ "<cluster_template>" : {
"setup" : { properties of the setup section },
"cloud" : { properties of the cloud section },
"login" : { properties of the login section },
"cluster" : { properties of the cluster section },
"nodes": { "<node_kind>" : { properties of the node},
"<node_kind>" : { properties of the node},
},
},
"<cluster_template>" : {
(see above)
}
}
}
The actual "property" parameters follow the names and types described in the
`Configuration` section of the manual. This is indeed nothing more than a
dereferenced un-dump of the configuration file; use
`load_config_files`:func: to load a set of configuration files into a data
structure like the above.
:param dict cluster_conf: see description above
:param str storage_path: path to store data
:raises MultipleInvalid: configuration validation
"""
DEFAULT_STORAGE_PATH = os.path.expanduser("~/.elasticluster/storage")
DEFAULT_STORAGE_TYPE = 'yaml'
def __init__(self, conf, storage_path=None, storage_type=None):
self.cluster_conf = conf['cluster']
self.storage_path = (
os.path.expandvars(os.path.expanduser(storage_path)) if storage_path
else self.DEFAULT_STORAGE_PATH)
self.storage_type = storage_type or self.DEFAULT_STORAGE_TYPE
[docs] def load_cluster(self, cluster_name):
"""
Load a cluster from the configured repository.
:param str cluster_name: name of the cluster
:return: :py:class:`elasticluster.cluster.cluster` instance
"""
repository = self.create_repository()
cluster = repository.get(cluster_name)
cluster._setup_provider = self.create_setup_provider(cluster.template)
cluster.cloud_provider = self.create_cloud_provider(cluster.template)
cluster.update_config(self.cluster_conf[cluster.template])
return cluster
[docs] def create_cloud_provider(self, cluster_template):
"""
Return cloud provider instance for the given cluster template.
:param str cluster_template: name of cluster template to use
:return: cloud provider instance that fulfills the contract of
:py:class:`elasticluster.providers.AbstractCloudProvider`
"""
try:
conf_template = self.cluster_conf[cluster_template]
except KeyError:
raise ConfigurationError(
"No cluster template `{0}` found in configuration file"
.format(cluster_template))
try:
cloud_conf = conf_template['cloud']
except KeyError:
# this should have been caught during config validation!
raise ConfigurationError(
"No cloud section for cluster template `{0}`"
" found in configuration file"
.format(cluster_template))
try:
provider = cloud_conf['provider']
except KeyError:
# this should have been caught during config validation!
raise ConfigurationError(
"No `provider` configuration defined"
" in cloud section `{0}`"
" of cluster template `{1}`"
.format(
cloud_conf.get('name', '***'),
cluster_template
))
try:
ctor = _get_provider(provider, CLOUD_PROVIDERS)
except KeyError:
# this should have been caught during config validation!
raise ConfigurationError(
"Unknown cloud provider `{0}` for cluster `{1}`"
.format(provider, cluster_template))
except (ImportError, AttributeError) as err:
raise RuntimeError(
"Unable to load cloud provider `{0}`: {1}: {2}"
.format(provider, err.__class__.__name__, err))
provider_conf = cloud_conf.copy()
provider_conf.pop('provider')
# use a single keyword args dictionary for instanciating
# provider, so we can detect missing arguments in case of error
provider_conf['storage_path'] = self.storage_path
try:
return ctor(**provider_conf)
except TypeError:
# check that required parameters are given, and try to
# give a sensible error message if not; if we do not
# do this, users only see a message like this::
#
# ERROR Error: __init__() takes at least 5 arguments (4 given)
#
# which gives no clue about what to correct!
import inspect
args, varargs, keywords, defaults = inspect.getargspec(ctor.__init__)
if defaults is not None:
# `defaults` is a list of default values for the last N args
defaulted = dict((argname, value)
for argname, value in zip(reversed(args),
reversed(defaults)))
else:
# no default values at all
defaulted = {}
for argname in args[1:]: # skip `self`
if argname not in provider_conf and argname not in defaulted:
raise ConfigurationError(
"Missing required configuration parameter `{0}`"
" in cloud section for cluster `{1}`"
.format(argname, cluster_template))
[docs] def create_cluster(self, template, name=None, cloud=None, setup=None):
"""
Creates a ``Cluster``:class: instance by inspecting the configuration
properties of the given cluster template.
:param str template: name of the cluster template
:param str name: name of the cluster. If not defined, the cluster
will be named after the template.
:param cloud: A `CloudProvider`:py:class: instance to use
instead of the configured one. If ``None`` (default)
then the configured cloud provider will be used.
:param setup: A `SetupProvider`:py:class: instance to use
instead of the configured one. If ``None`` (default)
then the configured setup provider will be used.
:return: :py:class:`elasticluster.cluster.Cluster` instance:
:raises ConfigurationError: cluster template not found in config
"""
if template not in self.cluster_conf:
raise ConfigurationError(
"No cluster template configuration by the name `{template}`"
.format(template=template))
conf = self.cluster_conf[template]
extra = conf.copy()
extra.pop('cloud')
extra.pop('nodes')
extra.pop('setup')
extra['template'] = template
if cloud is None:
cloud = self.create_cloud_provider(template)
if name is None:
name = template
if setup is None:
setup = self.create_setup_provider(template, name=name)
cluster = Cluster(
name=(name or template),
cloud_provider=cloud,
setup_provider=setup,
user_key_name=conf['login']['user_key_name'],
user_key_public=conf['login']['user_key_public'],
user_key_private=conf['login']["user_key_private"],
repository=self.create_repository(),
**extra)
nodes = conf['nodes']
for group_name in nodes:
group_conf = nodes[group_name]
for varname in ['image_user', 'image_userdata']:
group_conf.setdefault(varname, conf['login'][varname])
cluster.add_nodes(group_name, **group_conf)
return cluster
[docs] def create_setup_provider(self, cluster_template, name=None):
"""Creates the setup provider for the given cluster template.
:param str cluster_template: template of the cluster
:param str name: name of the cluster to read configuration properties
"""
try:
conf_template = self.cluster_conf[cluster_template]
except KeyError as err:
raise ConfigurationError(
"No cluster template `{0}` found in configuration file"
.format(cluster_template))
try:
conf = conf_template['setup']
except KeyError as err:
# this should have been caught during config validation!
raise ConfigurationError(
"No setup section for cluster template `{0}`"
" found in configuration file"
.format(cluster_template))
if name:
conf['cluster_name'] = name
conf_login = self.cluster_conf[cluster_template]['login']
provider_name = conf.get('provider', 'ansible')
if provider_name not in SETUP_PROVIDERS:
raise ConfigurationError(
"Invalid value `%s` for `setup_provider` in configuration "
"file." % provider_name)
provider = _get_provider(provider_name, SETUP_PROVIDERS)
storage_path = self.storage_path
playbook_path = conf.pop('playbook_path', None)
groups = self._read_node_groups(conf)
environment_vars = {}
for node_kind, grps in groups.items():
if not isinstance(grps, list):
groups[node_kind] = [grps]
# Environment variables parsing
environment_vars[node_kind] = {}
for key, value in (list(conf.items())
+ list(self.cluster_conf[cluster_template].items())):
# Set both group and global variables
for prefix in [(node_kind + '_var_'), "global_var_"]:
if key.startswith(prefix):
var = key.replace(prefix, '')
environment_vars[node_kind][var] = value
log.debug("setting variable %s=%s for node kind %s",
var, value, node_kind)
return provider(groups, playbook_path=playbook_path,
environment_vars=environment_vars,
storage_path=storage_path,
sudo=conf_login['image_sudo'],
sudo_user=conf_login['image_user_sudo'],
**conf)
def _read_node_groups(self, conf):
"""
Return mapping from node kind names to list of Ansible host group names.
"""
result = defaultdict(list)
for key, value in conf.items():
if not key.endswith('_groups'):
continue
node_kind = key[:-len('_groups')]
group_names = [group_name.strip()
for group_name in value.split(',') if group_name.strip()]
for group_name in group_names:
# handle renames
if group_name in self._RENAMED_NODE_GROUPS:
old_group_name = group_name
group_name, remove_at = self._RENAMED_NODE_GROUPS[group_name]
warn(
"Group `{0}` was renamed to `{1}`;"
" please fix your configuration file."
" Support for automatically renaming"
" this group will be removed in {2}."
.format(old_group_name, group_name,
(("ElastiCluster {0}".format(remove_at))
if remove_at
else ("a future version of ElastiCluster"))),
DeprecationWarning)
result[node_kind].append(group_name)
return result
_RENAMED_NODE_GROUPS = {
# old name -> (new name will be removed in...
'condor_workers': ('condor_worker', '1.4'),
'gluster_client': ('glusterfs_client', '1.4'),
'gluster_data' : ('glusterfs_server', '1.4'),
'gridengine_clients': ('gridengine_worker', '2.0'),
'maui_master': ('torque_master', '2.0'),
'pbs_clients': ('torque_worker', '2.0'),
'pbs_master': ('torque_master', '2.0'),
'slurm_clients': ('slurm_worker', '2.0'),
'slurm_workers': ('slurm_worker', '1.4'),
}
def create_repository(self):
return MultiDiskRepository(self.storage_path,
self.storage_type)