创建Step Function状态机

AWS Step Functions 是一种低代码、可视化工作流服务,开发人员可使用它来构建分布式应用程序、自动化 IT 和业务流程,以及使用 AWS 服务构建数据和机器学习管道。Step Functio 管理故障、有重试机制、并行化处理,集成其他AWS服务、具有可观测性,以便开发人员可以专注于更高价值的业务逻辑。

我们将使用Step Functions 来编排ETL处理Sales和Marketing数据:

  1. 运行Sales和Marking的DataBrew ETL作业
  2. 删除旧的Sales和Marketing表
  3. 创建新的Sales和Marketing表
  4. 将数据加载到Sales和Marketing表中
  5. 删除Summery output表
  6. 创建Summery output表
  7. 通知用户ETL任务完成

我们可以按顺序运行所有这些任务,但由于Sales和Marketing数据是两个独立的数据集,彼此不依赖 。我们可以节省一些时间——并行运行一些任务:

Completed Step Function

要定义Step Function 状态机,我们可以使用可视化编辑器 - Step Functions Workflow Studio , 或者使用 Amazon States Language 以编程方式定义它,将定义表示为 JSON。

使用Step Functions Workflow Studio创建状态机

Workflow Studio for AWS Step Functions 是适用于 Step Functions 的低代码可视化工作流程设计器,可让用户通过编排 AWS 服务来创建无服务器工作流程。

它使用拖拽的方式来创建和编辑工作流程,控制每个状态的输入和输出,并配置错误处理。 当创建工作流程时,Workflow Studio 会验证并自动生成代码。 用户可以查看生成的代码,或将其导出以进行本地开发或导出成CloudFormation

编辑完成后,可以保存工作流程、运行它,然后在 Step Functions 控制台中检查结果。

进入Step Functions页面,点击Create state machine

image-20231112194551479

选择Blank类型:

image-20231112194703229

会进入Workflow Studio的UI页面,我们以拖拽的方式完成以下的配置:

image-20231112195441319

上面每个状态都有对应的API Parameters

Type Name API Parameters Next State
Parallel state Refresh Sales Marketing Data N/A Drop Old Summerized Table
AWS Glue DataBrew: StartJobRun Sales DataBrew ETL Job { “Name”: “sales-data-etl” } Drop Old Sales Table
Amazon Athena: StartQueryExecution Drop Old Sales Table { “QueryString”: “DROP TABLE IF EXISTS sales_data_output”,“WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” } } Create Sales Table
Amazon Athena: StartQueryExecution Create Sales Table { “QueryString”: “CREATE EXTERNAL TABLE sales_data_output(date string, salesperson string, lead_name string, segment string, region string, target_close string, forecasted_monthly_revenue int, opportunity_stage string, weighted_revenue int, closed_opportunity boolean, active_opportunity boolean, latest_status_entry boolean) PARTITIONED BY (year string,month string, day string) ROW FORMAT SERDE ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’ STORED AS INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’ LOCATION ‘s3://TRANSFORMED_DATA_BUCKET/sales/’ TBLPROPERTIES (‘classification'='parquet’, ‘compressionType'='none’, ‘typeOfData'='file’)”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} Load Sales Table Partitions
Amazon Athena: StartQueryExecution Load Sales Table Partitions { “QueryString”: “MSCK REPAIR TABLE sales_data_output”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} Go to end
AWS Glue DataBrew: StartJobRun Marketing DataBrew ETL Job { “Name”: “marketing-data-etl” } Drop Old Marketing Table
Amazon Athena: StartQueryExecution Drop Old Marketing Table { “QueryString”: “DROP TABLE IF EXISTS marketing_data_output”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} Create Marketing Table
Amazon Athena: StartQueryExecution Create Marketing Table { “QueryString”: “CREATE EXTERNAL TABLE marketing_data_output(date string, new_visitors_seo int, new_visitors_cpc int, new_visitors_social_media int, return_visitors int, twitter_mentions int, twitter_follower_adds int, twitter_followers_cumulative int, mailing_list_adds_ int, mailing_list_cumulative int, website_pageviews int, website_visits int, website_unique_visits int, mobile_uniques int, tablet_uniques int, desktop_uniques int, free_sign_up int, paid_conversion int, events string) PARTITIONED BY (year string, month string, day string) ROW FORMAT SERDE ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’ STORED AS INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’ LOCATION ‘s3://TRANSFORMED_DATA_BUCKET/marketing/’ TBLPROPERTIES (‘classification'='parquet’, ‘compressionType'='none’, ‘typeOfData'='file’)”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} Load Marketing Table Partitions
Amazon Athena: StartQueryExecution Load Marketing Table Partitions { “QueryString”: “MSCK REPAIR TABLE marketing_data_output”,“WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/"}} Go to end
Amazon Athena: StartQueryExecution Drop Old Summerized Table { “QueryString”: “DROP TABLE IF EXISTS default.sales_marketing_revenue”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} Create Summerized Output
Amazon Athena: StartQueryExecution Create Summerized Output {“QueryString”: “CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} Notify Users
SNS: Publish Notify Users Topic ARN: Select the “StepFunctionWorkshopTopic” SNS Topic, Message: { "Input": "Monthly sales marketing data refreshed successfully!" } Do not check “Wait for callback” Go to end

注意将上面表格中所有的ATHENA_QUERY_BUCKETTRANSFORMED_DATA_BUCKET替换成CloudFormation输出的对应桶名。例如:

image-20231112200204368

最后一个SNS:Publish组件不用设置API Parameter,而是选择之前创建的topic:

image-20231112200511121

另外需要注意的是,每个State都要钩选上Wait for task to complete这样后一个任务都会等前一个任务执行完成才开始运行

image-20231112202952380

编辑State Machine的名称,命名为GlueWorkshopStudioStateMachine

image-20231112200827685

Execution Role选择CloudFormation提前创建好的(在CloudFormation创建出来的资源中找到)。如果让Step Functions自动创建(Create new role),它不够智能,有可能执行起来报权限不够,可以直接在创建出来的Role上添加Administrator权限来解决:

image-20231112200919661

开启Logging,然后点击创建。

使用Amazon States Language创建状态机

除了使用Workflow Studio方式可视化创建状态机外,也可以直接导入已经写好的JSON。

上面的状态机也可以使用JSON来定义。

点击查看Amazon States Language代码