"""task.py: Computational tasks with properties"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from ..base import HasProperties, Instance
from ..basic import Bool, Float, String
from ..utils import ValidationError
[docs]class BaseOutput(HasProperties):
"""HasProperties object with the result of a computation"""
success = Bool(
'Did the task succeed',
default=True,
)
log = String(
'Output log messages from the task',
default='',
)
[docs]class TaskStatus(HasProperties):
"""HasProperties object to indicate present status of the task"""
progress = Float(
'Task progress to completion',
required=False,
min=0,
max=1,
)
message = String(
'Task progress message',
required=False,
)
[docs]class BaseTask(object):
"""Class for defining a computational task
Input and Output class attributes must be subclasses of BaseInput and
BaseOutput respectively. Task is executed by calling an instance
of the task with Input property/value pairs as keyword arguments.
"""
_REGISTRY = dict()
Input = BaseInput
Output = BaseOutput
[docs] def __call__(self, **kwargs):
"""Execute the task
Keyword arguments are used to construct Input instance. This is
validated and passed to :code:`run`. The Output of :code:`run`
is validated, passed to :code:`process_output`, and returned.
"""
input_obj = self.Input.deserialize(kwargs)
input_obj.validate()
output_obj = self.run(input_obj)
if not isinstance(output_obj, BaseOutput):
raise ValidationError(
message='Invalid task output class: {}'.format(
output_obj.__class__.__name__,
),
reason='invalid_class',
instance=(
output_obj if isinstance(output_obj, HasProperties)
else None
),
)
output_obj.validate()
return self.process_output(output_obj)
[docs] def report_status(self, status):
"""Hook for reporting the task status towards completion"""
status = Instance('', TaskStatus).validate(None, status)
print(r'{taskname} | {percent:>3}% | {message}'.format(
taskname=self.__class__.__name__,
percent=int(round(100*status.progress)),
message=status.message if status.message else '',
))
[docs] def process_output(self, output_obj): #pylint: disable=no-self-use
"""Processes valid Output object into desired task output
This method is executed during :code:`__call__` on the output of
:code:`run`.
By default, this serializes the output to a dictionary.
"""
return output_obj.serialize(include_class=False)
[docs] def run(self, input_obj):
"""Execution logic for the task
This method must be overridden in Task subclasses
To run a Task, create an instance of the Task, then
call the instance with the required input parameters.
This will construct and validate an Input object.
:code:`run` receives this validated Input object. It then must
process the inputs and return an Output object.
"""
raise NotImplementedError('Override in client classes')
[docs]class TaskException(Exception):
"""An exception related to a computational task"""
[docs]class TemporaryTaskFailure(TaskException):
"""An exception indicating Task should be retried"""
[docs]class PermanentTaskFailure(TaskException):
"""An exception indicating Task should not be retried"""