Source code for taran.foreman
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" The Foreman class - An abstraction of the AWS SWF Decider operations """
from __future__ import (absolute_import, print_function, unicode_literals)
import json
import uuid
from collections import namedtuple
from botocore.exceptions import ClientError
from contracts import contract
from taran import Taran
from taran.helpers.aws.swf import (get_activity_version)
Decision = namedtuple('Decision', ['name', 'type', 'schedule_to_start_timeout', 'start_to_close_timeout',
'schedule_to_close_timeout', 'task_list', 'input'])
[docs]class Foreman(Taran):
"""A template for all decision processors.
Attributes:
configuration (module): The configuration a foreman needs in order to participate in the workflow.
"""
@contract(configuration='*')
def __init__(self, configuration=None):
"""Initialise state that applies to all foreman instances.
Args:
configuration (module): The configuration a foreman needs in order to participate in the workflow.
"""
super(Foreman, self).__init__(configuration=configuration)
self.task_list = configuration.FOREMAN_TASK_LIST if hasattr(configuration,
'FOREMAN_TASK_LIST') else 'default'
[docs] def poll_for_decision_task(self):
"""Poll for an decision task from SWF and return if a task token has been provided.
Returns:
task (dict): Details of the assigned task.
"""
try:
task = self.swf_client.poll_for_decision_task(domain=self.domain_name,
identity=self.identity,
taskList={'name': self.task_list})
if task and 'taskToken' in task:
self.decision_task = task
self.workflow_id = task['workflowExecution']['workflowId']
self.run_id = task['workflowExecution']['runId']
self.task_token = task['taskToken']
# SET WORKFLOW NAME AND VERSION
workflow_execution = self.swf_client.describe_workflow_execution(domain=self.domain_name,
execution={
'workflowId': self.workflow_id,
'runId': self.run_id
}
)
self.workflow_name = workflow_execution['executionInfo']['workflowType']['name']
self.workflow_version = workflow_execution['executionInfo']['workflowType']['version']
self.workflow_history = dict(events=task['events'], next_page_token=task.get('nextPageToken'),
previous_started_event_id=task.get('previousStartedEventId'))
return True
except ClientError as ce:
if 'AccessDeniedException' in ce.response['Error']['Code']:
self.msg(message='Insufficient privileges to poll for decision', level='error')
exit()
except:
raise
[docs] def get_workflow_history(self):
"""Get entire workflow history.
Returns:
a dict containing the entire workflow execution history
"""
events = list()
next_page_token = None
while True:
if not next_page_token:
if not self.workflow_history:
self.workflow_history = self.swf_client.get_workflow_execution_history(
domain=self.domain_name,
execution={
'workflowId': self.workflow_id,
'runId': self.run_id
},
maximumPageSize=1000,
reverseOrder=True
)
else:
events.extend(self.workflow_history.get('events'))
if self.workflow_history.get('nextPageToken'):
next_page_token = self.workflow_history.get('nextPageToken')
else:
self.workflow_history['events'] = events
break
else:
workflow_history = self.swf_client.get_workflow_execution_history(
domain=self.domain_name,
execution={
'workflowId': self.workflow_id,
'runId': self.run_id
},
nextPageToken=next_page_token,
maximumPageSize=1000,
reverseOrder=True
)
if workflow_history:
events.extend(workflow_history.get('events'))
else:
exit('No response')
if workflow_history.get('nextPageToken'):
next_page_token = workflow_history.get('nextPageToken')
else:
self.workflow_history['events'] = events
break
@contract(decisions='list')
[docs] def schedule_activity_tasks(self, decisions=None):
"""Retrieve the workflow history.
Args:
decisions (List): A list of dictionaries containing details of the activities to schedule.
"""
decisions_to_schedule = list()
for decision_to_schedule in decisions:
decisions_to_schedule.append(
{'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType': {
'name': decision_to_schedule.name,
'version': get_activity_version(activity_type=decision_to_schedule.type,
activity_list=self.activity_list),
},
'scheduleToStartTimeout': decision_to_schedule.schedule_to_start_timeout,
'startToCloseTimeout': decision_to_schedule.start_to_close_timeout,
'scheduleToCloseTimeout': decision_to_schedule.schedule_to_close_timeout,
'activityId': str(uuid.uuid1()),
'taskList': {'name': decision_to_schedule.task_list},
'input': decision_to_schedule.input
}
}
)
try:
self.swf_client.respond_decision_task_completed(taskToken=self.task_token,
decisions=decisions_to_schedule)
return True
except ClientError:
raise
except:
raise
[docs] def get_activity_results(self, activity=None):
"""Get the result returned when the activity became completed."""
activity_history = self.get_activity_history(workflow_history=self.workflow_history,
activity_type=activity)
results_list = list()
if activity_history:
for activity_event in activity_history:
if activity_event.get('status') == 'completed':
results_list.append(json.loads(activity_event.get('result')))
return results_list