AWS Step Functions 是一种低代码、可视化工作流服务,开发人员可使用它来构建分布式应用程序、自动化 IT 和业务流程,以及使用 AWS 服务构建数据和机器学习管道。Step Functio 管理故障、有重试机制、并行化处理,集成其他AWS服务、具有可观测性,以便开发人员可以专注于更高价值的业务逻辑。
我们将使用Step Functions 来编排ETL处理Sales和Marketing数据:
DataBrew ETL作业我们可以按顺序运行所有这些任务,但由于Sales和Marketing数据是两个独立的数据集,彼此不依赖 。我们可以节省一些时间——并行运行一些任务:

要定义Step Function 状态机,我们可以使用可视化编辑器 - Step Functions Workflow Studio , 或者使用 Amazon States Language 以编程方式定义它,将定义表示为 JSON。
Workflow Studio for AWS Step Functions 是适用于 Step Functions 的低代码可视化工作流程设计器,可让用户通过编排 AWS 服务来创建无服务器工作流程。
它使用拖拽的方式来创建和编辑工作流程,控制每个状态的输入和输出,并配置错误处理。 当创建工作流程时,Workflow Studio 会验证并自动生成代码。 用户可以查看生成的代码,或将其导出以进行本地开发或导出成CloudFormation。
编辑完成后,可以保存工作流程、运行它,然后在 Step Functions 控制台中检查结果。
进入Step Functions页面,点击Create state machine:

选择Blank类型:

会进入Workflow Studio的UI页面,我们以拖拽的方式完成以下的配置:

上面每个状态都有对应的API Parameters:
| Type | Name | API Parameters | Next State | 
|---|---|---|---|
| Parallel state | Refresh Sales Marketing Data | N/A | Drop Old Summerized Table | 
| AWS Glue DataBrew: StartJobRun | Sales DataBrew ETL Job | { “Name”: “sales-data-etl” } | Drop Old Sales Table | 
| Amazon Athena: StartQueryExecution | Drop Old Sales Table | { “QueryString”: “DROP TABLE IF EXISTS sales_data_output”,“WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” } } | Create Sales Table | 
| Amazon Athena: StartQueryExecution | Create Sales Table | { “QueryString”: “CREATE EXTERNAL TABLE sales_data_output(datestring,salespersonstring,lead_namestring,segmentstring,regionstring,target_closestring,forecasted_monthly_revenueint,opportunity_stagestring,weighted_revenueint,closed_opportunityboolean,active_opportunityboolean,latest_status_entryboolean) PARTITIONED BY (yearstring,monthstring,daystring) ROW FORMAT SERDE ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’ STORED AS INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’ LOCATION ‘s3://TRANSFORMED_DATA_BUCKET/sales/’ TBLPROPERTIES (‘classification'='parquet’, ‘compressionType'='none’, ‘typeOfData'='file’)”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Load Sales Table Partitions | 
| Amazon Athena: StartQueryExecution | Load Sales Table Partitions | { “QueryString”: “MSCK REPAIR TABLE sales_data_output”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Go to end | 
| AWS Glue DataBrew: StartJobRun | Marketing DataBrew ETL Job | { “Name”: “marketing-data-etl” } | Drop Old Marketing Table | 
| Amazon Athena: StartQueryExecution | Drop Old Marketing Table | { “QueryString”: “DROP TABLE IF EXISTS marketing_data_output”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Create Marketing Table | 
| Amazon Athena: StartQueryExecution | Create Marketing Table | { “QueryString”: “CREATE EXTERNAL TABLE marketing_data_output(datestring,new_visitors_seoint,new_visitors_cpcint,new_visitors_social_mediaint,return_visitorsint,twitter_mentionsint,twitter_follower_addsint,twitter_followers_cumulativeint,mailing_list_adds_int,mailing_list_cumulativeint,website_pageviewsint,website_visitsint,website_unique_visitsint,mobile_uniquesint,tablet_uniquesint,desktop_uniquesint,free_sign_upint,paid_conversionint,eventsstring) PARTITIONED BY (yearstring,monthstring,daystring) ROW FORMAT SERDE ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’ STORED AS INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’ LOCATION ‘s3://TRANSFORMED_DATA_BUCKET/marketing/’ TBLPROPERTIES (‘classification'='parquet’, ‘compressionType'='none’, ‘typeOfData'='file’)”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Load Marketing Table Partitions | 
| Amazon Athena: StartQueryExecution | Load Marketing Table Partitions | { “QueryString”: “MSCK REPAIR TABLE marketing_data_output”,“WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/"}} | Go to end | 
| Amazon Athena: StartQueryExecution | Drop Old Summerized Table | { “QueryString”: “DROP TABLE IF EXISTS default.sales_marketing_revenue”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Create Summerized Output | 
| Amazon Athena: StartQueryExecution | Create Summerized Output | {“QueryString”: “CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Notify Users | 
| SNS: Publish | Notify Users | Topic ARN: Select the “StepFunctionWorkshopTopic” SNS Topic, Message: { "Input": "Monthly sales marketing data refreshed successfully!" }Do not check “Wait for callback” | Go to end | 
注意将上面表格中所有的ATHENA_QUERY_BUCKET及TRANSFORMED_DATA_BUCKET替换成CloudFormation输出的对应桶名。例如:

最后一个SNS:Publish组件不用设置API Parameter,而是选择之前创建的topic:

另外需要注意的是,每个State都要钩选上Wait for task to complete,这样后一个任务都会等前一个任务执行完成才开始运行 :

编辑State Machine的名称,命名为GlueWorkshopStudioStateMachine:

Execution Role选择CloudFormation提前创建好的(在CloudFormation创建出来的资源中找到)。如果让Step Functions自动创建(Create new role),它不够智能,有可能执行起来报权限不够,可以直接在创建出来的Role上添加Administrator权限来解决:

开启Logging,然后点击创建。
Amazon States Language创建状态机除了使用Workflow Studio方式可视化创建状态机外,也可以直接导入已经写好的JSON。
上面的状态机也可以使用JSON来定义。