# Copyright(c) 2020 Dell Inc. or its subsidiaries.
#
# Licensed under the Apache License, Version 2.0(the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""settings.py."""
import logging
from PyU4V.common import CommonFunctions
from PyU4V.utils import constants
LOG = logging.getLogger(__name__)
[docs]
class SettingsFunctions(object):
def __init__(self, array_id, rest_client):
"""__init__."""
self.common = CommonFunctions(rest_client)
self.array_id = array_id
self.version = constants.UNISPHERE_VERSION
# Unipshere registration functions
[docs]
def get_scg_configuration_details(self) -> dict:
"""List SCG Configuration Details.
"""
query_params = {}
return self.common.get_request(
target_uri=f"/{self.common.UNI_VERSION}/settings/registration/scg",
resource_type=None, params=query_params)
[docs]
def get_scg_server_certificate_configuration(self) -> dict:
"""List SCG Server Certificate Configuration.
"""
return self.common.get_request(
target_uri=(f""
f"/{self.common.UNI_VERSION}/"
f"settings/registration/scg/server_cert"),
resource_type=None)
[docs]
def get_cloudiq_data_collection_configuration(self) -> dict:
"""List CloudIQ Data Collection Configuration.
:param array_id: The storage array ID -- string
"""
query_params = {}
return self.common.get_request(
target_uri=f"/{self.common.UNI_VERSION}/settings/"
f"registration/cloudiq", resource_type=None,
params=query_params)
[docs]
def register_cloudiq_data_collection(
self, send_data: bool = None,
data_collection_disabled: list[str, str] = [""]) -> dict:
"""Register CloudIQ Data Collection.
SCG Gateway must be configured.
:param send_data: send_data -- bool
"""
payload = {
"send_data": send_data,
"data_collection_disabled": data_collection_disabled
}
return self.common.modify_resource(
target_uri=f"/{self.common.UNI_VERSION}/settings/registration/"
f"cloudiq", resource_type=None, payload=payload)
# Functions for alert configurations
[docs]
def get_alert_notification_target_config(self) -> dict:
"""List Unisphere"s Alert Notification Configuration.
returns: Unisphere's Alert Notification Settings -- dict
"""
return self.common.get_request(
target_uri=f"/{self.common.UNI_VERSION}/settings/alert/"
f"notification", resource_type=None)
[docs]
def update_alert_notification_targets(
self, payload: dict = None, enable_email: bool = None,
email_server: str = None, sender_address: str = None,
email_port: int = None, enable_syslog: bool = None,
syslog_host: str = None, syslog_protocol: str = None,
syslog_text_format: str = None, tls: bool = None,
syslog_port: int = None,
enable_snmp: bool = None, snmp_host: str = None,
snmp_port: int = None,
snmp_version: int = None, snmp_security_model: str = None,
certificate_authority_pem: str = None,
snmp_username: str = None, snmp_password: str = None,
tls_security_name: str = None,
snmp_passphrase: str = None) -> dict:
"""
Update Unisphere's Alert Notification Target Settings.
:payload: Alert Notification Settings, can be a subset of the
overall payload from
get_alert_notification_settings-- dict
:param enable_email: Enable or disable email notifications -- bool
:param email_server: Email server address -- string
:param sender_address: Email sender address -- string
:param email_port: Email port number -- int
:param enable_syslog: Enable or disable syslog notifications -- bool
:param syslog_host: Syslog server address -- string
:param syslog_protocol: Syslog protocol(e.g., UDP, TCP) -- string
:param syslog_text_format: Syslog text format(e.g., RFC5424) -- string
:param tls: Enable or disable TLS for syslog notifications -- bool
:param syslog_port: Syslog port number -- int
:param enable_snmp: Enable or disable SNMP notifications -- bool
:param snmp_host: SNMP server address -- string
:param snmp_port: SNMP port number -- int
:param snmp_version: SNMP version(e.g., 1,3) -- int
:param snmp_security_model: SNMP security model(e.g. "usm" or "tsm")
-- string
:param certificate_authority_pem: cert in base64 format, for details on
encoding see documentation at
https://developer.dell.com/ -- string
:param snmp_username: SNMP username -- string
:param snmp_password: SNMP password -- string
:returns: Unisphere's Alert Notification Settings -- dict
This call is idempotent. It will also take a subet of the
confifuration, you can
provide updates to any category:
email, SNMP, and/or syslog.
"""
if not payload:
payload = {}
# Handle email settings
if enable_email:
email_payload = {
"email": {
"enabled": enable_email,
"email_server": email_server,
"sender_address": sender_address,
"email_port": email_port
}
}
if not all([email_server, sender_address, email_port]):
raise ValueError(
"enable_email is True but email server, sender address, "
"or email port is missing.")
else:
email_payload = {"email": {
"enabled": enable_email
}}
payload.update(email_payload)
# Handle SNMP settings
if enable_snmp:
snmp_payload = {
"snmp": {
"enabled": enable_snmp,
"snmp_trap_targets": [
{
"host": snmp_host,
"port": snmp_port,
"version": snmp_version
}
]
}
}
# SNMPv3 specific fields
if snmp_version == "3":
snmp_target = snmp_payload["snmp"]["snmp_trap_targets"][0]
snmp_target["security_model"] = snmp_security_model
snmp_target["authorization"] = {
"username": snmp_username,
"password": snmp_password,
"passphrase": snmp_passphrase
}
if tls_security_name and certificate_authority_pem:
snmp_target["tls"] = {
"security_name": tls_security_name,
"certificate_authority_pem": certificate_authority_pem
}
else:
snmp_payload = {
"snmp": {
"enabled": enable_snmp
}
}
payload.update(snmp_payload)
# Handle syslog settings
if enable_syslog:
if not all([syslog_host, syslog_protocol, syslog_text_format,
tls is not None, syslog_port]):
raise ValueError(
"enable_syslog is True but one or more required syslog "
"fields are missing.")
syslog_payload = {
"syslog": {
"enabled": enable_syslog,
"syslog_target": {
"host": syslog_host,
"protocol": syslog_protocol,
"text_format": syslog_text_format,
"tls": tls,
"port": syslog_port
}
}
}
payload.update(syslog_payload)
return self.common.create_resource(
target_uri=f"/{self.common.UNI_VERSION}/settings/alert/"
f"notification", resource_type=None, payload=payload)
[docs]
def get_alert_policies(
self, array_id: str = None, name: str = None, type: str = None,
applicable: bool = None, enabled: bool = None,
email_notifications: bool = None,
snmp_notifications: bool = None,
syslog_notifications: bool = None) -> dict:
"""List Alert Policies.
:param name: Optional value that filters returned list to display Alert
Policies based on Alert Policy name. e.g. equal to
"name=arrayEvents" or contains
"name=<like>array" -- string
:param type: Optional value that filters returned list to display Alert
Policies based on alert policy category possible values
are "array" and "file" -- string
:param applicable: Optional value that filters returned list to display
Alert Policies that are applicable to the storage
array(true/false) -- bool
:param enabled: Optional value that filters returned list to display
Alert Policies that are enabled(true/false) -- bool
:param email_notifications: Optional value that filters returned list
to display Alert Policies that have email
notifications enabled(true/false) --
bool
:param snmp_notifications: Optional value that filters returned list to
display Alert Policies that have SNMP
notifications enabled(true/false) -- string
:param syslog_notifications: Optional value that filters returned list
to display Alert Policies that have
syslog notifications enabled
(true/false) -- bool
:param array_id: The storage array ID -- string
"""
array_id = array_id if array_id else self.array_id
query_params = {"name": name, "type": type, "applicable": applicable,
"enabled": enabled,
"email_notifications": email_notifications,
"snmp_notifications": snmp_notifications,
"syslog_notifications": syslog_notifications}
return self.common.get_request(
target_uri=f"/{self.common.UNI_VERSION}/settings/symmetrix/"
f"{array_id}/alert/alert_policy",
resource_type=None, params=query_params)
[docs]
def update_alert_policies(self, payload: dict,
array_id: str = None) -> dict:
"""Update Alert Policies.
:param array_id: The storage array ID -- string
:param payload: Optional payload, can be user supplied or generated
from get_alert_policies, users can use a subset of payload if they
are only wishing to do partial update -- dict
"""
array_id = array_id if array_id else self.array_id
return self.common.modify_resource(
target_uri=f"/{self.common.UNI_VERSION}/settings/symmetrix/"
f"{array_id}/alert/alert_policy",
resource_type=None, payload=payload)
[docs]
def get_host_access_control_configuration(self,
array_id: str = None) -> dict:
"""List Host Access Control Configuration.
:param symmetrixId: symmetrixId -- string
:param array_id: The storage array ID -- string
"""
array_id = array_id if array_id else self.array_id
return self.common.get_request(
target_uri=(f"/{self.common.UNI_VERSION}/settings/symmetrix"
f"/{array_id}/access_control/host_access"),
resource_type=None)
[docs]
def get_authorization_rules(
self, array_id: str = None, name: str = None,
account_type: str = None,
authority: str = None, qualifier: str = None,
role: str = None) -> dict:
"""List Authorization Rules.
:param array_id: symmetrixId -- string
:param name: Optional value that filters returned list to display
Authorization Rules based on the name of the user or group
e.g. equal to "name=testUser" or contains
name=<like>test" -- string
:param account_type: Optional value that filters returned list to
display Authorization Rules based on the
account type. Possible values are "user"
and "group" -- string
:param authority: Supported authority types for Unisphere REST API are:
localDirectory, host, ldapSSL, windowsAD and sso (
Single Sign-On) or "none" -- string
:param qualifier: Optional value that filters returned list to
display Authorization Rules based on the qualifier
e.g. equal to "qualifier=test" or contains
"qualifier=<like>test" -- string
:param role: Optional value that filters returned list to display
Authorization Rules based on the role e.g. "role=admin"
-- string
:returns: Authorization Rules -- dict
"""
array_id = array_id if array_id else self.array_id
query_params = {"name": name, "account_type": account_type,
"authority": authority, "qualifier": qualifier,
"role": role, }
return self.common.get_request(
target_uri=(f"/{self.common.UNI_VERSION}/settings/symmetrix"
f"/{array_id}/access_control/authorization_rule"),
resource_type=None, params=query_params)
[docs]
def create_authorization_rule(self, name: str, roles: list[str],
account_type: str = None,
authority: str = None, qualifier: str = None,
array_id: str = None,
local_rep: bool = False,
local_rep_storage_groups: str = None,
remote_rep: bool = False,
local_rep_wildcard: str = None,
remote_rep_storage_groups: str = None,
remote_rep_wildcard: str = None,
device_manage: bool = False,
device_manage_storage_groups: str = None,
device_manage_wildcard: str = None) -> dict:
"""
Create an Authorization Rule.
:param name: Name of the authorization rule (string)
:param roles: List of roles (up to 4), e.g. ["admin", "monitor"].
Valid roles: admin, security_admin, storage_admin,
mainframe_admin, monitor, auditor, perf_monitor,
or "none"
:param account_type: Account type (string)
:param authority: Source of the authority ("LocalDirectory", "ldapSSL",
"windowsAD", "host", "sso", "none") -- string
:param qualifier: Domain or Hostname -- string
:param array_id: Storage array ID -- string
:param local_rep: Enable local replication (bool)
:param local_rep_storage_groups: Comma-separated list of SGs -- string
:param local_rep_wildcard: Local rep wildcard -- string
:param remote_rep: Enable remote replication (bool)
:param remote_rep_storage_groups: Comma-separated list of SGs -- string
:param remote_rep_wildcard: Remote rep wildcard -- string
:param device_manage: Enable device management (bool)
:param device_manage_storage_groups: Comma-separated list
of SGs -- string
:param device_manage_wildcard: Device management wildcard -- string
"""
array_id = array_id or self.array_id
if len(roles) > 4:
raise ValueError("A user/group can have a maximum of 4 roles.")
if "none" in roles and len(roles) > 1:
raise ValueError("'none' cannot be combined with other roles.")
def build_role_dict(enabled, storage_groups, wildcards):
role_dict = {"enabled": enabled}
if storage_groups is not None:
role_dict["storage_groups"] = storage_groups
if wildcards is not None:
role_dict["wildcards"] = wildcards
return role_dict
payload = {
"name": name,
"account_type": account_type,
"authority": authority,
"qualifier": qualifier,
"roles": {role: True for role in roles}
}
if local_rep:
payload["roles"]["local_rep"] = build_role_dict(
local_rep, local_rep_storage_groups, local_rep_wildcard
)
if remote_rep:
payload["roles"]["remote_rep"] = build_role_dict(
remote_rep, remote_rep_storage_groups, remote_rep_wildcard
)
if device_manage:
payload["roles"]["device_manage"] = build_role_dict(
device_manage, device_manage_storage_groups,
device_manage_wildcard)
return self.common.create_resource(
target_uri=(f"/{self.common.UNI_VERSION}/settings/symmetrix/"
f"{array_id}/access_control/authorization_rule"),
resource_type=None,
payload=payload
)
[docs]
def update_authorization_rules(
self, payload: dict, array_id: str = None) -> dict:
"""Update Authorization Rule.
:param array_id: The storage array ID -- string
:param payload: Optional payload, can be user supplied or generated
from get_authorization_rules, users can use a subset of payload if they
are only wishing to do partial update -- dict
"""
array_id = array_id if array_id else self.array_id
return self.common.modify_resource(
target_uri=(f"/{self.common.UNI_VERSION}/settings/symmetrix/"
f"{array_id}/access_control/authorization_rule"),
resource_type=None, payload=payload)
[docs]
def get_system_thresholds(self, array_id: str = None, name: str = None,
applicable: bool = None, type: str = None,
unit: str = None, warning_threshold: int = None,
critical_threshold: int = None,
fatal_threshold: int = None,
email_notifications: bool = None,
snmp_notifications: bool = None,
syslog_notifications: bool = None) -> dict:
"""List System Threshold Alert Configuration for specified Array.
:param array_id: symmetrixId -- string
:param applicable: Optional value that filters returned list to display
System Thresholds that are applicable to the
storage array(true/false) -- string
:param type: Optional value that filters returned list to display
System Thresholds based on system threshold category.
Possible values are "system", "srp" and
"storage_container" -- string
:param unit: Optional value that filters returned list to display
System Thresholds based on system threshold category.
Possible values are "percent" -- int
:param warning_threshold: Optional value that filters returned list to
display System Thresholds that are lt, gt or eq to the
specified warning threshold -- int
:param critical_threshold: Optional value that filters returned list to
display System Thresholds that are lt,
gt or eq to the specified critical
threshold -- int
:param fatal_threshold: Optional value that filters returned list to
display System Thresholds that are lt,
gt or eq to the specified fatal threshold --
int
:param email_notifications: Optional value that filters returned list
to display System Thresholds that have
email notifications enabled(true/false)
-- string
:param snmp_notifications: Optional value that filters returned list to
display System Thresholds that have SNMP
notifications enabled(true/false) -- string
:param syslog_notifications: Optional value that filters returned list
to display System Thresholds that have
syslog notifications enabled(true/false)
-- string
:param array_id: The storage array ID -- string
"""
array_id = array_id if array_id else self.array_id
query_params = {"name": name, "applicable": applicable, "type": type,
"unit": unit, "warning_threshold": warning_threshold,
"critical_threshold": critical_threshold,
"fatal_threshold": fatal_threshold,
"email_notifications": email_notifications,
"snmp_notifications": snmp_notifications,
"syslog_notifications": syslog_notifications, }
return self.common.get_request(
target_uri=(f"/{self.common.UNI_VERSION}/settings/symmetrix"
f"/{array_id}/alert/system_threshold"),
resource_type=None, params=query_params)
[docs]
def update_system_thresholds(self, payload: dict,
array_id: str = None) -> dict:
"""Update System Thresholds.
:payload: payload, user built or generated from get_system_thresholds
-- dict
"""
array_id = array_id if array_id else self.array_id
payload = payload
return self.common.modify_resource(
target_uri=(
f"/{self.common.UNI_VERSION}/settings/symmetrix/"
f"{array_id}/alert/system_threshold"), resource_type=None,
payload=payload)
[docs]
def get_alert_notification_agent_details(self) -> dict:
"""List Alert Notification Agent Details.
"""
return self.common.get_request(
target_uri=(f""
f"/{self.common.UNI_VERSION}/settings/alert/"
f"notification/agent_details"), resource_type=None)
[docs]
def get_alert_notification_settings(self, array_id: str = None) -> dict:
"""System Alert Notification Settings.
Shows configuration of Alert Severity for System and Performance
Alerts, per array. Also shows subscription of Alert Notification for
email subscribers.
:param symmetrixId: symmetrixId -- string
:param array_id: The storage array ID -- string
"""
array_id = array_id if array_id else self.array_id
return self.common.get_request(
target_uri=(f"/{self.common.UNI_VERSION}/settings/symmetrix"
f"/{array_id}/alert/notification"),
resource_type=None)
[docs]
def update_alert_notifications(self, payload: dict, array_id: str = None) \
-> dict:
"""Set/update Alert Notification Settings.
Defines what alert severity is sent for System and Performance
Alerts, per array. Also sets subsciption of Alert Notification for
email subscribers.
:payload: payload, user built or generated from
get_alert_notification_settings
sample_payload = {'alert_severity': {
'system': {'fatal': True,
'critical': True,
'warning': True,
'normal': False,
'info': False},
'performance': {'critical': True,
'warning': True,
'info': False}},
'subscriptions': {'system': ['random.person@dell.com'],
'performance': ['random.person@dell.com'],
'jobs': [],
'reports': ['random.person@dell.com'],
'none': []}} -- dict
:array_id: The storage array ID -- string
"""
array_id = array_id if array_id else self.array_id
payload = payload
return self.common.modify_resource(
target_uri=(
f"/{self.common.UNI_VERSION}/settings/symmetrix/"
f"{array_id}/alert/notification"), resource_type=None,
payload=payload)
[docs]
def get_storage_group_compliance_policy_allocation(
self, array_id: str = None) -> dict:
"""Get Storage Group Compliance Policy Allocation.
:param symmetrixId: symmetrixId -- string
:param array_id: The storage array ID -- string
"""
array_id = array_id if array_id else self.array_id
query_params = {}
return self.common.get_request(
target_uri=(f"/{self.common.UNI_VERSION}/settings/symmetrix"
f"/{array_id}/alert/compliance_alert_policy/"
f"storage_group"),
resource_type=None, params=query_params)