# Copyright (c) 2020 Dell Inc. or its subsidiaries.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""common.py."""
import json
import logging
import math
import re
import six
import socket
import time
from PyU4V.utils import constants
from PyU4V.utils import exception
LOG = logging.getLogger(__name__)
# HTTP constants
GET = constants.GET
POST = constants.POST
PUT = constants.PUT
DELETE = constants.DELETE
# Status code constants
STATUS_200 = constants.STATUS_200
STATUS_201 = constants.STATUS_201
STATUS_202 = constants.STATUS_202
STATUS_204 = constants.STATUS_204
STATUS_401 = constants.STATUS_401
STATUS_404 = constants.STATUS_404
# Job constants
INCOMPLETE_LIST = constants.INCOMPLETE_LIST
CREATED = constants.CREATED
SUCCEEDED = constants.SUCCEEDED
# Resource constants
SYSTEM = constants.SYSTEM
JOB = constants.JOB
VERSION = constants.VERSION
SYMMETRIX = constants.SYMMETRIX
SLOPROVISIONING = constants.SLOPROVISIONING
COMMON = constants.COMMON
ITERATOR = constants.ITERATOR
PAGE = constants.PAGE
WLP = constants.WLP
HEADROOM = constants.HEADROOM
[docs]
class CommonFunctions(object):
"""CommonFunctions."""
def __init__(self, rest_client):
"""__init__."""
self.rest_client = rest_client
self.request = self.rest_client.rest_request
self.interval = self.rest_client.interval
self.retries = self.rest_client.retries
self.UNI_VERSION = constants.UNISPHERE_VERSION
[docs]
def wait_for_job_complete(self, job):
"""Given the job wait for it to complete.
:param job: job details -- dict
:returns: response code, result, status, task details -- int, str, str,
list
:raises: VolumeBackendAPIException
"""
res, tasks = None, None
if job['status'].lower() == SUCCEEDED:
try:
res, tasks = job['result'], job['task']
except KeyError:
pass
return 0, res, job['status'], tasks
def _wait_for_job_complete():
"""Called at an interval until the job is finished."""
retries = kwargs['retries']
try:
kwargs['retries'] = retries + 1
if not kwargs['wait_for_job_called']:
is_complete, result, rc, status, task = (
self._is_job_finished(job_id))
if is_complete is True:
kwargs['wait_for_job_called'] = True
kwargs['rc'], kwargs['status'] = rc, status
kwargs['result'], kwargs['task'] = result, task
else:
kwargs['status'], kwargs['task'] = status, task
except Exception as error:
exception_message = 'Issue encountered waiting for job.'
LOG.exception(exception_message)
raise exception.VolumeBackendAPIException(
data=exception_message) from error
return kwargs
job_id = job['jobId']
kwargs = {'retries': 0, 'wait_for_job_called': False,
'rc': 0, 'result': None}
while not kwargs['wait_for_job_called']:
time.sleep(self.interval)
kwargs = _wait_for_job_complete()
if kwargs['retries'] > self.retries:
LOG.error('_wait_for_job_complete failed after {cnt} '
'tries.'.format(cnt=kwargs['retries']))
kwargs['rc'], kwargs['result'] = -1, kwargs['result']
break
LOG.debug('Return code is: {rc}. Result is {res}.'.format(
rc=kwargs['rc'], res=kwargs['result']))
return (kwargs['rc'], kwargs['result'],
kwargs['status'], kwargs['task'])
[docs]
def get_job_by_id(self, job_id):
"""Get details of a specific job.
:param job_id: job id -- str
:returns: job details -- dict
"""
return self.get_resource(category=SYSTEM, resource_level=JOB,
resource_level_id=job_id)
def _is_job_finished(self, job_id):
"""Check if the job is finished.
:param job_id: job id -- str
:returns: job complete, result, response code, status, task
details -- bool, str, int, str, list
"""
complete, rc, status, result, task = False, 0, None, None, None
job = self.get_job_by_id(job_id)
if job:
status = job['status']
try:
result, task = job['result'], job['task']
except KeyError:
pass
if status.lower() == SUCCEEDED:
complete = True
elif status.lower() in INCOMPLETE_LIST:
complete = False
else:
rc, complete = -1, True
return complete, result, rc, status, task
[docs]
@staticmethod
def check_status_code_success(operation, status_code, message):
"""Check if a status code indicates success.
:param operation: operation being performed -- str
:param status_code: status code -- int
:param message: server response -- str
:raises: VolumeBackendAPIException
"""
if status_code not in [STATUS_200, STATUS_201,
STATUS_202, STATUS_204]:
exception_message = (
'Error {op}. The status code received is {sc} and the message '
'is {msg}.'.format(op=operation, sc=status_code, msg=message))
if status_code == STATUS_404:
raise exception.ResourceNotFoundException(
data=exception_message)
if status_code == STATUS_401:
raise exception.UnauthorizedRequestException()
raise exception.VolumeBackendAPIException(
data=exception_message)
[docs]
def wait_for_job(self, operation, status_code, job):
"""Check if call is async, wait for it to complete.
:param operation: operation being performed -- str
:param status_code: status code -- int
:param job: job id -- str
:returns: task details -- list
:raises: VolumeBackendAPIException
"""
task = None
if status_code == STATUS_202:
rc, result, status, task = self.wait_for_job_complete(job)
if rc != 0:
exception_message = (
'Error {op}. Status code: {sc}. Error: {err}. '
'Status: {st}.'.format(
op=operation, sc=rc, err=six.text_type(result),
st=status))
LOG.error(exception_message)
raise exception.VolumeBackendAPIException(
data=exception_message)
return task
[docs]
def build_target_uri(self, **kwargs):
"""Build the target URI.
This function calls into _build_uri() for access outside this class.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:returns: target URI -- str
"""
return self._build_uri(**kwargs)
def _build_uri(self, **kwargs):
"""Build the target URI.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:returns: target URI -- str
"""
target_uri, version = str(), None
if kwargs.get('category') not in ['performance', 'common']:
version = self._build_uri_get_version(kwargs.get('version'),
kwargs.get('no_version'))
if version:
target_uri += '/{version}'.format(version=version)
target_uri += '/{category}'.format(
category=kwargs.get('category'))
if kwargs.get('resource_level'):
target_uri += '/{resource_level}'.format(
resource_level=kwargs.get('resource_level'))
if kwargs.get('resource_level_id'):
target_uri += '/{resource_level_id}'.format(
resource_level_id=kwargs.get('resource_level_id'))
if kwargs.get('resource_type'):
target_uri += '/{resource_type}'.format(
resource_type=kwargs.get('resource_type'))
if kwargs.get('resource_type_id'):
target_uri += '/{resource_type_id}'.format(
resource_type_id=kwargs.get('resource_type_id'))
if kwargs.get('resource'):
target_uri += '/{resource}'.format(
resource=kwargs.get('resource'))
if kwargs.get('resource_id'):
target_uri += '/{resource_id}'.format(
resource_id=kwargs.get('resource_id'))
if kwargs.get('object_type'):
target_uri += '/{object_type}'.format(
object_type=kwargs.get('object_type'))
if kwargs.get('object_type_id'):
target_uri += '/{object_type_id}'.format(
object_type_id=kwargs.get('object_type_id'))
return target_uri
def _build_uri_get_version(self, version=None, no_version=False):
"""Get the Unisphere version for the target URI.
:param version: version to use from kwargs -- str
:param no_version: if URI should be versionless -- bool
:returns: version -- str
"""
if not version and no_version:
version = None
elif not version and not no_version:
version = self.UNI_VERSION
elif version and no_version:
LOG.debug(
'Version has been specified along with no_version flag, '
'ignoring no_version flag and using version {ver}'.format(
ver=version))
return version
[docs]
def get_request(self, target_uri, resource_type, params=None):
"""Send a GET request to the array.
:param target_uri: target uri -- str
:param resource_type: the resource type, e.g. maskingview -- str
:param params: optional filter params -- dict
:returns: resource_object -- dict
:raises: ResourceNotFoundException
"""
message, sc = self.request(target_uri, GET, params=params)
operation = 'GET {resource_type}'.format(resource_type=resource_type)
self.check_status_code_success(operation, sc, message)
return message
[docs]
def get_resource(self, *args, **kwargs):
"""Get resource details from the array.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:key params: query parameters -- dict
:returns: resource object -- dict
"""
target_uri = self._build_uri(**kwargs)
resource_type = None
if args:
resource_type = args[2]
elif not args and kwargs:
resource_type = kwargs.get('resource_level')
return self.get_request(
target_uri, resource_type, kwargs.get('params'))
[docs]
def create_resource(self, *args, **kwargs):
"""Create a resource.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:key payload: query parameters -- dict
:returns: resource object -- dict
"""
target_uri = self._build_uri(**kwargs)
if kwargs.get('target_uri'):
target_uri = (kwargs.get('target_uri'))
message, status_code = self.request(
target_uri, POST, request_object=kwargs.get('payload'))
resource_type = None
if args:
resource_type = args[2]
elif not args and kwargs:
resource_type = kwargs.get('resource_level')
operation = ('POST {resource_type} resource'.format(
resource_type=resource_type))
self.check_status_code_success(operation, status_code, message)
return message
[docs]
def modify_resource(self, *args, **kwargs):
"""Modify a resource.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:key payload: query parameters
:returns: resource object -- dict
"""
target_uri = self._build_uri(**kwargs)
if kwargs.get('target_uri'):
target_uri = (kwargs.get('target_uri'))
message, status_code = self.request(
target_uri, PUT, request_object=kwargs.get('payload'))
resource_type = None
if args:
resource_type = args[2]
elif not args and kwargs:
resource_type = kwargs.get('resource_level')
operation = ('PUT {resource_type} resource'.format(
resource_type=resource_type))
self.check_status_code_success(operation, status_code, message)
return message
[docs]
def delete_resource(self, *args, **kwargs):
"""Delete a resource.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:key payload: query parameters
"""
target_uri = self._build_uri(**kwargs)
if kwargs.get('target_uri'):
target_uri = (kwargs.get('target_uri'))
message, status_code = self.request(
target_uri, DELETE, request_object=kwargs.get('payload'),
params=kwargs.get('params'))
resource_type = None
if args:
resource_type = args[2]
elif not args and kwargs:
resource_type = kwargs.get('resource_level')
operation = ('DELETE {resource_type} resource'.format(
resource_type=resource_type))
self.check_status_code_success(operation, status_code, message)
[docs]
def download_file(self, **kwargs):
"""Download a file.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:key payload: query parameters -- dict
:returns: file info including binary data -- dict
:raises: ValueError
"""
target_uri = self._build_uri(**kwargs)
if kwargs.get('target_uri'):
target_uri = (kwargs.get('target_uri'))
response, status_code = self.rest_client.file_transfer_request(
method=POST, download=True, uri=target_uri,
timeout=kwargs.get('timeout'), r_obj=kwargs.get('payload'))
try:
message = response.raw.reason
operation = ('download {resource_type} resource'.format(
resource_type=kwargs.get('resource_level')))
self.check_status_code_success(operation, status_code, message)
except ValueError:
LOG.error(
'There request to download from {uri} has failed and no '
'message has been returned from Unisphere. Please check '
'Unisphere REST logs for further details.'.format(
uri=target_uri))
return response
[docs]
def upload_file(self, **kwargs):
"""Upload a file.
:key version: Unisphere version -- int
:key no_version: if versionless uri -- bool
:key category: resource category e.g. sloprovisioning -- str
:key resource_level: resource level e.g. storagegroup -- str
:key resource_level_id: resource level id -- str
:key resource_type: optional name of resource -- str
:key resource_type_id: optional name of resource -- str
:key resource: optional name of resource -- str
:key resource_id: optional name of resource -- str
:key object_type: optional name of resource -- str
:key object_type_id: optional name of resource -- str
:key form_data: multipart form data -- dict
:returns: response success details -- dict
"""
response_content = dict()
target_uri = self._build_uri(**kwargs)
if kwargs.get('target_uri'):
target_uri = (kwargs.get('target_uri'))
response, status_code = self.rest_client.file_transfer_request(
method=POST, upload=True, uri=target_uri,
form_data=kwargs.get('form_data'))
try:
response_content = json.loads(response.text)
msg = response_content.get('message')
operation = ('upload {resource_type} resource'.format(
resource_type=kwargs.get('resource_level')))
self.check_status_code_success(operation, status_code, msg)
# Workaround until failed responses do not return 200
if not response_content.get('success', False):
LOG.error(msg)
raise exception.VolumeBackendAPIException(msg)
LOG.info('The settings upload request was successful.')
except ValueError:
LOG.error(
'There request to upload to {uri} has failed and no '
'message has been returned from Unisphere. Please check '
'Unisphere REST logs for further details.'.format(
uri=target_uri))
return response_content
[docs]
def get_uni_version(self):
"""Get the unisphere version from the server.
:returns: version and major_version e.g. "V10.0.0.0", "100" -- str, str
"""
version, major_version = None, None
response = self.get_resource(category=VERSION, no_version=True)
if response and response.get('version'):
version = response['version']
version_list = version.split('.')
major_version = version_list[0][1:] + version_list[1]
return version, major_version
[docs]
def get_uni_version_info(self):
"""Get the unisphere version from the server.
:returns: {'version': 'T10.1.0.468', 'api_version': '101',
'supported_api_versions': ['101', '100', '92']} -- dict
"""
response = self.get_resource(category=VERSION, no_version=True)
return response
[docs]
def get_array_list(self, filters=None):
"""Return a list of arrays.
:param filters: optional filters -- dict
:returns: arrays ids -- list
"""
response = self.get_resource(
category=SYSTEM, resource_level=SYMMETRIX, params=filters)
return response.get('symmetrixId', list()) if response else list()
[docs]
def get_v3_or_newer_array_list(self, filters=None):
"""Return a list of V3 or newer arrays in the environment.
:param filters: optional filters -- dict
:returns: arrays ids -- list
"""
response = self.get_resource(
category=SLOPROVISIONING, resource_level=SYMMETRIX, params=filters)
return response.get('symmetrixId', list()) if response else list()
[docs]
def get_array(self, array_id):
"""Get array details.
:param array_id: array id -- str
:returns: array details -- dict
"""
return self.get_resource(category=SYSTEM, resource_level=SYMMETRIX,
resource_level_id=array_id)
[docs]
def get_iterator_page_list(self, iterator_id, start, end):
"""Get a page of results from an iterator instance.
:param iterator_id: iterator id -- str
:param start: the start number -- int
:param end: the end number -- int
:returns: iterator page results -- dict
"""
response = self.get_resource(
no_version=True, category=COMMON, resource_level=ITERATOR,
resource_level_id=iterator_id, resource_type=PAGE,
params={'from': start, 'to': end})
return response.get('result', list()) if response else list()
[docs]
def get_iterator_results(self, rest_response):
"""Get all results from all pages of an iterator if count > 1000.
:param rest_response: response JSON from REST API -- dict
:returns: all results -- dict
"""
full_response = list()
full_response += rest_response['resultList']['result']
if rest_response.get('count') and int(rest_response.get('count')) > 0:
count = rest_response.get('count')
max_page_size = rest_response.get('maxPageSize')
if int(count) > int(max_page_size):
total_iterations = int(math.ceil(count / float(max_page_size)))
iterator_id = rest_response.get('id')
# We skip to second page as we already have the first page in
# the input param rest_response
for x in range(1, total_iterations):
start = x * max_page_size + 1
end = (x + 1) * max_page_size
if end > count:
end = count
full_response += self.get_iterator_page_list(iterator_id,
start, end)
return full_response
[docs]
@staticmethod
def check_ipv4(ipv4):
"""Check if a given string is a valid ipv6 address
:param ipv4: ipv4 address -- str
:returns: string is valid ipv4 address -- bool
"""
try:
socket.inet_pton(socket.AF_INET, ipv4)
return True
except socket.error:
return False
[docs]
@staticmethod
def check_ipv6(ipv6):
"""Check if a given string is a valid ipv6 address
:param ipv6: ipv6 address -- str
:returns: string is valid ipv6 address -- bool
"""
try:
socket.inet_pton(socket.AF_INET6, ipv6)
return True
except socket.error:
return False
[docs]
@staticmethod
def convert_to_snake_case(camel_case_string):
"""Convert a string from camel case to snake case.
:param camel_case_string: string for formatting -- str
:returns: snake case variant -- str
"""
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_case_string)
s2 = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
return s2.replace('__', '_')
[docs]
@staticmethod
def check_timestamp(in_timestamp):
"""Check that the timestamp is in the correct format
:param in_timestamp: timestamp e.g. 2020-11-24 15:00 -- str
"""
pattern = (r'^[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[1-2][0-9]|3[0-1]) '
r'(2[0-3]|[01][0-9]):[0-5][0-9]$')
return re.match(pattern, in_timestamp)
[docs]
@staticmethod
def check_epoch_timestamp(in_epoch_timestamp):
"""Check that the timestamp is in the correct format
:param in_epoch_timestamp: timestamp e.g. 1554332400 -- str
"""
pattern1 = r'^[0-9]{10}$'
pattern2 = r'^[0-9]{13}$'
return re.match(pattern1, in_epoch_timestamp) or re.match(
pattern2, in_epoch_timestamp)
[docs]
def is_array_v4(self, array_id):
"""Check to see if array is a v4
:param array_id: the array serial number
:returns: bool
"""
is_v4 = False
array_details = self.get_array(array_id)
if array_details:
ucode_version = array_details.get(
'ucode') or array_details.get('microcode')
if ucode_version:
major_version = ucode_version.split('.')[0]
if major_version >= constants.UCODE_6079:
is_v4 = True
return is_v4