# -*- coding: utf-8 -*-
from copy import deepcopy
from urllib2 import unquote
from itertools import chain
import jsonschema
import re
import logging
logger = logging.getLogger(__name__)
from werkzeug.datastructures import MultiDict, Headers
from .errors import ValidationErrors, FieldError
from .validators import SwaggerValidator
# These locations are *not* intended for body params which should already
# be the correct type. All other param locations are strings.
STRINGLY_LOCATIONS = ['path', 'query', 'headers']
VALID_LOCATIONS = STRINGLY_LOCATIONS + ['body']
[docs]class OperationNotFound(Exception):
"""
Raised when an operation cannot be found in a schema.
"""
[docs]class InvalidPath(Exception):
"""
Raised when a path does not match an operation's route.
"""
[docs]def find_operation(schema, operation_id):
"""
Returns a tuple of the route, method and schema for an operation.
:raises: :class:`OperationNotFound`
When the operation_id does not exist anywhere in the sec.
"""
# TODO a reverse index would be quicker but would require guarantees
# that the schema dictionary is immutable.
for route, schema in schema['paths'].iteritems():
for method, subschema in schema.iteritems():
if subschema.get('operationId') == operation_id:
return (route, method, subschema)
raise OperationNotFound('Could not find {}'.format(operation_id))
[docs]def coerce_param(value, schema):
"""
Coerces a named parameter within a path, query, or headers to the
type specified in the appropriate schema. If the string is not
properly formated, raise a FieldError.
:param string value:
The stringified value
:param dict schema:
The schema dictionary for the parameter.
:raises: :class:`~spectastic.errors.FieldError`
When the primitive type is not coercible.
:return:
The value, coerced according to the parameter definition for the
field named ``name`` located in ``location``.
"""
location = schema['in']
name = schema['name']
if location not in STRINGLY_LOCATIONS:
raise ValueError('Field location invalid, should be in: {}'.format(
', '.join(STRINGLY_LOCATIONS)
))
# Open API defines 3 primitive types. They're simple enough to handle
# within these conditionals without resorting to specialized methods.
if schema['type'] == 'integer':
if not re.match('^\-?[0-9]|[1-9][0-9]*$', value):
raise FieldError(
'Field {} is not an integer'.format(name),
location,
name,
)
try:
return int(value)
except ValueError:
raise FieldError(
'Field {} is not an integer'.format(name),
location,
name,
)
elif schema['type'] == 'number':
try:
return float(value)
except ValueError:
raise FieldError(
'Field {} is not a number'.format(name),
location,
name
)
elif schema['type'] == 'boolean':
if value not in ['true', 'false']:
raise FieldError(
# flake8: noqa
'Field {} is not a boolean, expected `true` or `false` got `{}'.format(name, value),
location,
name
)
elif value == 'true':
return True
elif value == 'false':
return False
else:
raise RuntimeError('Botched boolean cooercion')
elif schema['type'] == 'string':
return value
else:
raise ValueError('Unknown type specified in schema {}'.format(
schema['type']
))
[docs]class Operation(object):
@staticmethod
[docs] def from_schema(schema, operation_id):
route, method, local_schema = find_operation(schema, operation_id)
return Operation(schema, route, method, local_schema)
def __init__(self, schema, route, method, local_schema):
"""
:param dict schema: The overall schema for this API.
:param dict local_schema: The schema specific to this operation.
:param string route: The route for this operation.
:param string method: The http method for this operation.
"""
self.schema = schema
self.local_schema = local_schema
self.route = route
self.method = method.upper()
# Convenience methods for accessing parameter types for this operation.
# ---------------------------------------------------------------------
def _param_schemas(self, location):
if location not in ['query', 'body', 'path', 'header']:
raise ValueError('Unknown parameter location: {}'.format(location))
return {
schema['name'].lower(): schema for schema in
self.local_schema['parameters']
if schema['in'] == location
}
[docs] def query_schemas(self):
"""
Returns a list of all query parameter schemas.
"""
return self._param_schemas('query')
[docs] def path_schemas(self):
"""
Returns a list of all path parameter schemas.
"""
return self._param_schemas('path')
[docs] def body_schema(self):
"""
Returns a tuple of consisting of the body parameter and it's corresponding
body schema.
"""
for schema in self.local_schema['parameters']:
if schema['in'] == 'body':
return schema, schema['schema']
else:
return None, {}
# Param munging. Because non-body input params arrive as strings.
# Singular, case insensitive-accessors for schemas on a named parameter.
# ----------------------------------------------------------------------
[docs] def path_schema(self, path_param_name):
"""
Returns the schema for a path parameter of a given name. The parameter
name is case-insensitive.
:raises: :class:`KeyError`
If the query is not found.
"""
path_schemas = self.path_schemas()
return path_schemas[path_param_name.lower()]
[docs] def query_schema(self, query_param_name):
"""
Returns the schema for a query parameter of a given name. The parameter
name is case-insensitive.
:raises: :class:`KeyError`
If the query is not found.
"""
query_schemas = self.query_schemas()
return query_schemas[query_param_name.lower()]
@property
def validator(self):
if not hasattr(self, '_validator'):
self._validator = SwaggerValidator(
self.schema, format_checker=jsonschema.draft4_format_checker,
)
return self._validator
@property
def _path_matcher(self):
"""
Returns a memoized, compiled regular expression matcher built for
this operations route.
"""
if not hasattr(self, '__path_matcher'):
path_schemas = self.path_schemas()
path_subs = {
path_schema['name']: '(?P<{}>[^/]*)'.format(
path_schema['name']
) for path_schema in path_schemas.values()
}
# Swagger routes are conveniently easy to substitute.
path_expr = '^{}/{}$'.format(
self.schema.get('basePath','').rstrip('/'),
self.route.lstrip('/').format(**path_subs)
)
# If there's still braces left in the route we've missed a parameter.
# in the schema and it's invalid.
if '{' in path_expr or '}' in path_expr:
# flake8: noqa
raise RuntimeError('Unmatched parameters remain after route substitution: {}'.format(path_expr))
self.__path_matcher = re.compile(path_expr)
return self.__path_matcher
# Iterative validation methods for each parameter location.
# ---------------------------------------------------------
[docs] def iter_request_body_errors(self, request_body):
"""
Validates a request body against the schema, yielding
a :class:`~spectastic.errors.FieldError` for each failure.
"""
body_param, body_schema = self.body_schema()
if body_param:
if body_param.get('required') == True \
and not hasattr(request_body, 'iteritems'):
yield FieldError(
"A request body is required and must be an object",
'body',
body_param['name']
)
return
for error in self.validator.iter_errors(request_body, body_schema):
# Relies on our overriden 'required' validator to provide
# an error path.
path = error.path
yield FieldError(error.message, 'body', '.'.join(map(
str, path
)))
[docs] def iter_request_path_errors(self, request_path):
"""
Validates a request path against the schema, yielding
a :class:`~spectastic.errors.FieldError` for each failure.
"""
path_schemas = self.path_schemas()
if len(path_schemas) == 0:
return
params = self.extract_path_args(request_path)
for param, value in params.iteritems():
param = param.lower()
# The input is a string that we need to type case at least partially
# before passing to jsonschema, which expects the primitive type
# to be correct (as it's intended for use with json data).
if param in path_schemas:
schema = self.path_schema(param)
try:
value = coerce_param(value, schema)
except FieldError:
yield FieldError(
'Invalid type for {}'.format(schema['name']),
'path',
schema['name'],
)
for error in self.validator.iter_errors(value, schema):
yield FieldError(
error.message, 'path', schema['name']
)
for name, schema in path_schemas.iteritems():
if schema.get('required') == True and schema['name'] not in params:
yield FieldError(
'Required path parameter is missing',
'path',
schema.get('name')
)
[docs] def iter_request_query_errors(self, query_params):
"""
Validates a request query against the schema, yielding
a :class:`~spectastic.errors.FieldError` for each failure.
"""
query_schemas = self.query_schemas()
if len(query_schemas) == 0:
return
# For MultiDicts, which are preferred because http requests support
# multiple query parameters of the same name, we pass the multi
# argument to ensure we iterate over every query parameter.
if isinstance(query_params, MultiDict):
iterargs = {'multi': True}
else:
iterargs = {}
for name, value in query_params.iteritems(**iterargs):
if name in query_schemas:
schema = deepcopy(self.query_schema(name))
# The required validator won't work in the context of a single
# value. We remove it validate it in a subsequent loop.
if 'required' in schema:
schema.pop('required')
try:
value = coerce_param(value, schema)
except FieldError:
yield FieldError(
'Invalid type for {}'.format(schema['name']),
'path',
schema['name'],
)
for error in self.validator.iter_errors(value, schema):
yield FieldError(
error.message,
'query',
schema['name']
)
for name, schema in query_schemas.iteritems():
if schema.get('required', False) and name not in query_params:
yield FieldError(
'Required query parameter is missing',
'query',
schema.get('name')
)
[docs] def iter_request_errors(self, request):
"""
Validates an entire request, yielding a
:class:`~spectastic.errors.FieldError` for each failure.
:param ~spectastic.request.BasicRequest request:
A request conforming to the structure outlined in
:class:`~spectastic.request.BasicRequest`
"""
for error in chain(
self.iter_request_body_errors(request.body),
self.iter_request_path_errors(request.path),
self.iter_request_query_errors(request.query),
self.iter_request_header_errors(request.headers),
):
yield error
# ValidationError raising methods for each parameter location
# -----------------------------------------------------------
[docs] def validate_request_body(self, request_body):
"""
Validates a request body against the operation schema.
:raises: :class:`~spectastic.errors.ValidationErrors`
When validation fails.
:return bool: True on success.
"""
errors = list(self.iter_request_body_errors(request_body))
if errors:
raise ValidationErrors(errors)
else:
return True
[docs] def validate_request_path(self, path_arguments):
"""
Validates headers against the operation.
:raises: :class:`~spectastic.errors.ValidationErrors`
When validation fails.
:return bool: True on success.
"""
errors = list(self.iter_request_path_errors(path_arguments))
if errors:
raise ValidationErrors(errors)
else:
return True
[docs] def validate_request_query(self, query_params):
"""
Validates the query parameters from a request.
:param dict|werkzeug.datastructures.MultiDict query_params:
A dictionary like object of query parameters.
:raises: :class:`~spectastic.errors.ValidationErrors`
When validation fails.
:return bool: True on success.
"""
errors = list(self.iter_request_query_errors(query_params))
if errors:
raise ValidationErrors(errors)
else:
return True
[docs] def validate_request(self, request):
"""
Validates all components of a request.
:param request:
A request conforming to the structure outlined in
:class:`~spectastic.request.BasicRequest`
:raises: :class:`~spectastic.errors.ValidationErrors`
When validation fails.
:return bool: True on success.
"""
errors = list(self.iter_request_errors(request))
if errors:
raise ValidationErrors(errors)
else:
return True