Source code for taran.worker
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""This module provides worker/actor specific methods to all child classes.
"""
from __future__ import (absolute_import, print_function, unicode_literals)
import json
from botocore.exceptions import ClientError
from contracts import contract
from taran import Taran
[docs]class Worker(Taran):
"""A template for all decision processors.
Attributes:
configuration (module): The configuration a worker needs in order to participate in the workflow.
"""
@contract(configuration='*')
def __init__(self, configuration=None):
"""Initialise state that applies to all workers.
Args:
configuration (module): The configuration a worker needs in order to participate in the workflow.
"""
super(Worker, self).__init__(configuration=configuration)
[docs] def poll_for_activity_task(self):
"""Poll for an activity task from SWF and return if a task token has been provided.
Returns:
task (dict): Details of the assigned task.
"""
self.msg(message='Polling for task routed to: ({0})...'.format(self.task_list))
try:
task = self.swf_client.poll_for_activity_task(domain=self.domain_name,
taskList={'name': self.task_list},
identity=self.identity)
if task and 'taskToken' in task:
self.activity_task = task
self.activity_type_name = task['activityType']['name']
self.workflow_id = task['workflowExecution']['workflowId']
self.run_id = task['workflowExecution']['runId']
self.task_token = task['taskToken']
# SET WORKFLOW NAME AND VERSION
workflow_execution = self.swf_client.describe_workflow_execution(domain=self.domain_name,
execution={
'workflowId': self.workflow_id,
'runId': self.run_id
}
)
self.workflow_name = workflow_execution['executionInfo']['workflowType']['name']
self.workflow_version = workflow_execution['executionInfo']['workflowType']['version']
except ClientError as ce:
if 'AccessDeniedException' in ce.response['Error']['Code']:
self.msg(message='Insufficient privileges to poll for task', level='error')
exit()
except:
raise
[docs] def get_activity_results(self, activity=None):
"""Get a list of all results (when activity completed)"""
activity_history = self.get_activity_history(workflow_history=self.workflow_history,
activity_type=activity)
results_list = list()
if activity_history:
for activity_event in activity_history:
if activity_event.get('status') == 'completed':
results_list.append(json.loads(activity_event.get('result')))
return results_list
[docs] def complete_activity_task(self, result='Undefined'):
"""Signal activity task as complete."""
try:
self.swf_client.respond_activity_task_completed(taskToken=self.task_token, result=result)
except ClientError as ce:
if 'UnknownResourceFault' in ce.response['Error']['Code']:
self.msg(message='Unable to complete activity task as Workflow'
' execution does not exist (already terminated?)')
[docs] def activity_task_failed(self, reason=None, details=None):
"""Signal that activity task failed."""
try:
self.swf_client.respond_activity_task_failed(
taskToken=self.task_token,
reason=reason,
details=details
)
except ClientError as ce:
print(str(ce))