AWS Step Functions
是一种低代码、可视化工作流服务,开发人员可使用它来构建分布式应用程序、自动化 IT 和业务流程,以及使用 AWS 服务构建数据和机器学习管道。Step Functio
管理故障、有重试机制、并行化处理,集成其他AWS服务、具有可观测性,以便开发人员可以专注于更高价值的业务逻辑。
我们将使用Step Functions
来编排ETL处理Sales和Marketing数据:
DataBrew ETL
作业我们可以按顺序运行所有这些任务,但由于Sales和Marketing数据是两个独立的数据集,彼此不依赖 。我们可以节省一些时间——并行运行一些任务:
要定义Step Function
状态机,我们可以使用可视化编辑器 - Step Functions Workflow Studio
, 或者使用 Amazon States Language
以编程方式定义它,将定义表示为 JSON。
Workflow Studio for AWS Step Functions
是适用于 Step Functions 的低代码可视化工作流程设计器,可让用户通过编排 AWS 服务来创建无服务器工作流程。
它使用拖拽的方式来创建和编辑工作流程,控制每个状态的输入和输出,并配置错误处理。 当创建工作流程时,Workflow Studio 会验证并自动生成代码。 用户可以查看生成的代码,或将其导出以进行本地开发或导出成CloudFormation
。
编辑完成后,可以保存工作流程、运行它,然后在 Step Functions 控制台中检查结果。
进入Step Functions页面,点击Create state machine
:
选择Blank
类型:
会进入Workflow Studio
的UI页面,我们以拖拽的方式完成以下的配置:
上面每个状态都有对应的API Parameters
:
Type | Name | API Parameters | Next State |
---|---|---|---|
Parallel state | Refresh Sales Marketing Data | N/A | Drop Old Summerized Table |
AWS Glue DataBrew: StartJobRun | Sales DataBrew ETL Job | { “Name”: “sales-data-etl” } | Drop Old Sales Table |
Amazon Athena: StartQueryExecution | Drop Old Sales Table | { “QueryString”: “DROP TABLE IF EXISTS sales_data_output”,“WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” } } | Create Sales Table |
Amazon Athena: StartQueryExecution | Create Sales Table | { “QueryString”: “CREATE EXTERNAL TABLE sales_data_output (date string, salesperson string, lead_name string, segment string, region string, target_close string, forecasted_monthly_revenue int, opportunity_stage string, weighted_revenue int, closed_opportunity boolean, active_opportunity boolean, latest_status_entry boolean) PARTITIONED BY (year string,month string, day string) ROW FORMAT SERDE ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’ STORED AS INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’ LOCATION ‘s3://TRANSFORMED_DATA_BUCKET/sales/’ TBLPROPERTIES (‘classification'='parquet’, ‘compressionType'='none’, ‘typeOfData'='file’)”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} |
Load Sales Table Partitions |
Amazon Athena: StartQueryExecution | Load Sales Table Partitions | { “QueryString”: “MSCK REPAIR TABLE sales_data_output”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Go to end |
AWS Glue DataBrew: StartJobRun | Marketing DataBrew ETL Job | { “Name”: “marketing-data-etl” } | Drop Old Marketing Table |
Amazon Athena: StartQueryExecution | Drop Old Marketing Table | { “QueryString”: “DROP TABLE IF EXISTS marketing_data_output”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Create Marketing Table |
Amazon Athena: StartQueryExecution | Create Marketing Table | { “QueryString”: “CREATE EXTERNAL TABLE marketing_data_output (date string, new_visitors_seo int, new_visitors_cpc int, new_visitors_social_media int, return_visitors int, twitter_mentions int, twitter_follower_adds int, twitter_followers_cumulative int, mailing_list_adds_ int, mailing_list_cumulative int, website_pageviews int, website_visits int, website_unique_visits int, mobile_uniques int, tablet_uniques int, desktop_uniques int, free_sign_up int, paid_conversion int, events string) PARTITIONED BY (year string, month string, day string) ROW FORMAT SERDE ‘org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe’ STORED AS INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat’ OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat’ LOCATION ‘s3://TRANSFORMED_DATA_BUCKET/marketing/’ TBLPROPERTIES (‘classification'='parquet’, ‘compressionType'='none’, ‘typeOfData'='file’)”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} |
Load Marketing Table Partitions |
Amazon Athena: StartQueryExecution | Load Marketing Table Partitions | { “QueryString”: “MSCK REPAIR TABLE marketing_data_output”,“WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/"}} | Go to end |
Amazon Athena: StartQueryExecution | Drop Old Summerized Table | { “QueryString”: “DROP TABLE IF EXISTS default.sales_marketing_revenue”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”,“ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Create Summerized Output |
Amazon Athena: StartQueryExecution | Create Summerized Output | {“QueryString”: “CREATE TABLE default.sales_marketing_revenue AS SELECT * FROM (SELECT sales.year, sales.month, total_paid_conversion, total_weighted_revenue FROM (SELECT year, month, sum(paid_conversion) as total_paid_conversion FROM default.marketing_data_output group by year, month) sales INNER JOIN (SELECT year, month, sum(weighted_revenue) as total_weighted_revenue FROM default.sales_data_output group by year, month) marketing on sales.year=marketing.year AND sales.month=marketing.month) ORDER BY year DESC, month DESC”, “WorkGroup”: “StepFunctionsWorkshopWorkgroup”, “ResultConfiguration”: { “OutputLocation”: “s3://ATHENA_QUERY_BUCKET/” }} | Notify Users |
SNS: Publish | Notify Users | Topic ARN: Select the “StepFunctionWorkshopTopic” SNS Topic, Message: { "Input": "Monthly sales marketing data refreshed successfully!" } Do not check “Wait for callback” |
Go to end |
注意将上面表格中所有的ATHENA_QUERY_BUCKET
及TRANSFORMED_DATA_BUCKET
替换成CloudFormation输出的对应桶名。例如:
最后一个SNS:Publish
组件不用设置API Parameter,而是选择之前创建的topic:
另外需要注意的是,每个State都要钩选上Wait for task to complete
,这样后一个任务都会等前一个任务执行完成才开始运行 :
编辑State Machine
的名称,命名为GlueWorkshopStudioStateMachine
:
Execution Role选择CloudFormation提前创建好的(在CloudFormation创建出来的资源中找到)。如果让Step Functions自动创建(Create new role
),它不够智能,有可能执行起来报权限不够,可以直接在创建出来的Role上添加Administrator
权限来解决:
开启Logging,然后点击创建。
Amazon States Language
创建状态机除了使用Workflow Studio方式可视化创建状态机外,也可以直接导入已经写好的JSON。
上面的状态机也可以使用JSON来定义。