Openstack taskflow介绍

前言

TaskFlow是OpenStack中的一个Python库,主要目的是让task(任务)执行更加容易可靠,能将轻量的任务对象组织成一个有序的流。 若未安装taskflow到环境中:

pip install taskflow

目前TaskFlow支持三种模式:

  • 线性:运行一个任务或流的列表,是一个接一个串行方式运行。
  • 无序:运行一个任务或流的列表,以并行的方式运行,顺序与列表顺序无关,任务之间不存在依赖关系。
  • 图:运行一个图标(组节点和边缘节点)之间组成的任务/流依赖驱动的顺序。

任务的状态

就像任何其他的任务流系统一样,每个任务都有一些状态:

  • PENDING
  • RUNNING
  • SUCCESS
  • FAILURE

你也可以创建自定义的状态。

举个例子

线性:

#!/usr/bin/env python
# coding=utf-8

from __future__ import print_function
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task


class A(task.Task):
    def execute(self, a_msg, *args, **kwargs):
        print('A : {}' . format(a_msg))


class B(task.Task):
    def execute(self, b_msg, *args, **kwargs):
        print('B : {}' . format(b_msg))

flow = lf.Flow('simple-linear-listen').add(
    A(),
    B()
    )

engine = taskflow.engines.load(flow, store=dict(a_msg='a', b_msg='b'))
engine.run()

Output:

A : a
B : b

说明:A任务永远都会在B任务之前。 检查任务状态 修改代码:

#!/usr/bin/env python
# coding=utf-8

from __future__ import print_function
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task


def flow_watch(state, details):
    print("Flow State:{}".format(state))
    print("Flow Details:{}".format(details))


class A(task.Task):
    def execute(self, a_msg, *args, **kwargs):
        print('A:{}' . format(a_msg))


class B(task.Task):
    def execute(self, b_msg, *args, **kwargs):
        print('B:{}' . format(b_msg))

flow = lf.Flow('simple-linear-listen').add(
    A(),
    B()
    )

engine = taskflow.engines.load(flow, store = dict(a_msg = 'a', b_msg = 'b'))
engine.notifier.register('*', flow_watch)
engine.run()

注册了一个监听器将报告给flow_wtach函数。 Output:

Flow State:RUNNING
Flow Details:{'engine': <taskflow.engines.action_engine.engine.SerialActionEngine object at 0x0333A2D0>, 'old_state': 'PENDING', 'flow_name': u'simple-linear-listen', 'flow_uuid': '59385218-0fc8-4566-a308-17b0d69cf8b2'}
A:a
B:b
Flow State:SUCCESS
Flow Details:{'engine': <taskflow.engines.action_engine.engine.SerialActionEngine object at 0x0333A2D0>, 'old_state': 'RUNNING', 'flow_name': u'simple-linear-listen', 'flow_uuid': '59385218-0fc8-4566-a308-17b0d69cf8b2'}

当流状态发生改变,就会被捕捉到,若只监听流状态,也可以改为:

engine.notifier.register('SUCCESS', flow_watch)

也可以做到监听任务:

任务异常

在一组任务中,若其中一个发生异常,流的任务失败,就需要处理异常工作:

#!/usr/bin/env python
# coding=utf-8

from __future__ import print_function
import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task


class A(task.Task):
    def execute(self, a_msg, *args, **kwargs):
        print('A : {}' . format(a_msg))

    def revert(self, a_msg, *args, **kwargs):
        print('A {} revert' . format(a_msg))


class B(task.Task):
    def execute(self, b_msg, *args, **kwargs):
        print('B : {}' . format(b_msg))

    def revert(self, b_msg, *args, **kwargs):
        print('B {} revert' .format(b_msg))


class C(task.Task):
    def execute(self, c_msg, *args, **kwargs):
        print('C : {}' . format(c_msg))
        raise IOError('C IOError')


flow = lf.Flow('simple-linear-listen').add(
    A(),
    B(),
    C()
    )

engine = taskflow.engines.load(flow, store = dict(a_msg = 'a', b_msg = 'b',c_msg = 'c'))
try:
    engine.run()
except Exception as e:
    print("flow failed:{}" .format(e))

Output:

A : a
B : b
C : c
B b revert
A a revert
flow failed:C IOError

说明,如果出现异常,会执行revert函数进行清理工作。 相关链接:TaskFlow维基