Source code for taran.starter

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""This module provides a class that abstracts the configuration and the SWF 'start_workflow_execution' operation.
"""
from __future__ import (print_function, unicode_literals)

import uuid
from datetime import datetime

from botocore.exceptions import ClientError
from contracts import contract
from six import text_type

from taran import Taran


[docs]class Starter(Taran): """Class that defines instances of starter that are used to perform checks and then execute a workflow.""" @contract(configuration='*') def __init__(self, configuration=None): super(Starter, self).__init__(configuration=configuration) self.workflow_input = '-' self.foreman_task_list = configuration.FOREMAN_TASK_LIST if hasattr(configuration, 'FOREMAN_TASK_LIST') else 'default' self.allow_parallel_exec = configuration.ALLOW_PARALLEL_EXEC if hasattr(configuration, 'ALLOW_PARALLEL_EXEC') else False
[docs] def ensure_domain_exists(self, domain_name=None): """Return true if specified domain exists, otherwise create it.""" prefix = 'Registering domain \'{0}\':'.format(domain_name) try: self.swf_client.register_domain( name=domain_name, description='{0} domain'.format(domain_name), workflowExecutionRetentionPeriodInDays='10' ) self.msg(message='{0} [REGISTERED]'.format(prefix)) return True except ClientError as ce: if 'DomainAlreadyExistsFault' in ce.response['Error']['Code']: self.msg(message='{0} [OK]'.format(prefix)) return True else: raise except: raise
[docs] def start_workflow(self): """Start the workflow.""" self.msg(message='Request received to start \'{0}\' workflow'.format(self.workflow_name)) open_workflow_executions = self.swf_client.list_open_workflow_executions( domain=self.domain_name, startTimeFilter={ 'oldestDate': datetime(2016, 1, 1) }, typeFilter={ 'name': self.workflow_name, 'version': self.workflow_version }, ) if not self.allow_parallel_exec and open_workflow_executions.get('executionInfos'): self.msg(message='Cannot start workflow as an existing instance is already running', level='warning') exit(1) self.msg(message='Checking registration of workflow and activity types:') self.ensure_domain_exists(domain_name=self.domain_name) self.ensure_workflow_type_exists( workflow_name=self.workflow_name, workflow_version=self.workflow_version) for activity in self.activity_list: self.ensure_activity_type_exists( activity_name=activity.get('name'), activity_version=activity.get('version'), activity_task_list=activity.get('task_list')) self.workflow_id = text_type(uuid.uuid1())[:13] self.msg(message='Starting workflow execution') start_result = self.swf_client.start_workflow_execution(domain=self.domain_name, workflowType={'name': self.workflow_name, 'version': self.workflow_version}, executionStartToCloseTimeout='3600', input=self.workflow_input, taskStartToCloseTimeout='10', workflowId=self.workflow_id, taskList={ 'name': self.foreman_task_list}) self.run_id = start_result['runId'] self.msg(message='Started workflow execution') return {'run_id': self.run_id, 'workflow_id': self.workflow_id}
# TODO: Determine how default task list should be set - defined in config? @contract(workflow_name='unicode', workflow_version='unicode')
[docs] def ensure_workflow_type_exists(self, workflow_name=None, workflow_version=None): """Check the workflow type exists and create it if it doesn't.""" prefix = 'Registering workflow type \'{0}\':'.format(workflow_name) try: self.swf_client.register_workflow_type( domain=self.domain_name, name=workflow_name, version=workflow_version, defaultTaskList={ 'name': 'default' }, defaultTaskStartToCloseTimeout='500', defaultExecutionStartToCloseTimeout='500', defaultChildPolicy='TERMINATE', ) self.msg(message='{0} [REGISTERED]'.format(prefix)) return True except ClientError as ce: if 'TypeAlreadyExistsFault' in ce.response['Error']['Code']: self.msg(message='{0} [OK]'.format(prefix)) return True else: raise
@contract(activity_name='unicode', activity_version='unicode', activity_task_list='unicode')
[docs] def ensure_activity_type_exists(self, activity_name=None, activity_version=None, activity_task_list=None): """Check the activity type exists and create it if it doesn't.""" prefix = 'Registering activity type \'{0}\':'.format(activity_name) try: self.swf_client.register_activity_type( domain=self.domain_name, name=activity_name, version=activity_version, defaultTaskStartToCloseTimeout='60', defaultTaskHeartbeatTimeout='600', defaultTaskList={ 'name': activity_task_list }, defaultTaskScheduleToStartTimeout='60', defaultTaskScheduleToCloseTimeout='60' ) self.msg(message='{0} [REGISTERED]'.format(prefix)) return True except ClientError as ce: if 'TypeAlreadyExistsFault' in ce.response['Error']['Code']: self.msg(message='{0} [OK]'.format(prefix)) return True else: raise