更新时间:2023-04-25 GMT+08:00
开发算子代码
DWR算子开发运行依赖于FunctionGraph平台,本节将使用python开发一个下载OBS对象的算子,python使用详情参见Python函数开发指南。通过对该示例算子开发方式的说明,能够帮助算子开发者快速适应开发流程。通过本节将学到以下内容:
- 算子代码的入口结构
- 如何在算子中获得AK/SK
- 如何通过算子访问OBS
操作步骤
- 登录FunctionGraph控制台创建python函数,创建详情参见创建事件函数。
- 函数类型选择“事件函数”,该类型函数可以被事件触发,在DWR的使用过程中为OBS的事件,例如上传对象事件,下载对象事件等。
- 代码上传方式选择“默认代码”。创建成功后FunctionGraph平台会自动生成一个python代码。
图1 创建函数界面
- 编辑python代码。
如图2所示,默认代码中只有一个handler函数,入参为两个:
- event :触发事件信息,OBS事件触发的函数(DWR中称为算子)中event包含桶名,对象名。event也是自定义信息的承载体,DWR用户输入内容通过event中的动态参数(dynamic_source)保存。
- context :函数的上下文信息,例如函数执行时委托的AK/SK,Token等。更多内容可参考FunctionGraph开发文档。
- 测试代码
- 测试代码需要配置测试事件,在下拉框中选择“配置测试事件”,进入配置测试事件界面。
图3 配置测试事件
- 选择“创建新的测试事件”并选择“空白模板”,将原有内容替换为以下事件(json格式)内容。
图4 创建新的测试事件
{ "execution_name": "84a3dd2bd67f43aa9b98cdd74604ca68", "graph_name": "test_workflow", "Records": [ { "eventName": "ObjectCreated:Put", "eventRegion": "cn-north-4", "eventSource": "OBS", "eventTime": "2021-12-23T14:50:22.957Z", "eventVersion": "3.0", "obs": { "Version": "1.0", "bucket": { "bucket": "examplebucket", "name": "examplebucket", "ownerIdentity": { "ID": "08b4efe0fc00d3ce0f17c01b948f6e80" } }, "configurationId": "test-trigger", "object": { "eTag": "fc85a07cff68977bf5b2108e7436ca2d", "key": "exampleobject.docx", "oldpsxpth": "", "sequencer": "1", "size": "524298", "versionId": "G001017DE60E176D0000401106696610null" } }, "requestParameters": { "sourceIPAddress": "x.x.x.x" }, "responseElements": { "x-obs-id-2": "", "x-obs-request-id": "84a3dd2bd67f43aa9b98cdd74604ca68" }, "userIdentity": { "ID": "08b4efe0fc00d3ce0f17c01b948f6e80" } } ], "inputs": { "parametername": "parametervalue" }, "dynamic_source": { "parametername": "parametervalue", "bucketname": "inputBucketName", "object": "inputObject" } }
- 点击“保存”后,即可点击“测试”按钮查看运行结果。
可以在“执行结果”中看到保存的事件信息完整的打印在了参数body中,此函数已具备接收OBS事件的能力,可作为算子在DWR中运行。接下来我们将在其中添加“下载OBS对象功能”让函数功能更加丰富。
图5 测试结果
- 测试代码需要配置测试事件,在下拉框中选择“配置测试事件”,进入配置测试事件界面。
- 添加OBS下载逻辑(涉及AK/SK获取)
下载OBS的方式参考下载对象,示例采用二进制下载方式,主要代码如下所示。
如图6所示,执行结果中已经获取到目标对象的大小 size: 292685。# -*- coding:utf-8 -*- import json # 引入模块 from obs import ObsClient import os def handler (event, context): # 获取AK/SK ak = context.getAccessKey() sk = context.getSecretKey() regionid = os.getenv('region') # dynamic_source是event中一个特殊结构体,通过它算子提供方可以获取用户在DWR中配置的参数 bucketname = event["dynamic_source"]["bucketname"] objectName = event["dynamic_source"]["object"] print("ak : {}, sk : {}".format(ak, sk) ) print("bucket name in dynamic_source is : {}".format(bucketname) ) # 创建ObsClient实例 obsClient = ObsClient( access_key_id=ak, secret_access_key=sk, server='https://obs.{}.huawei.com'.format(regionid) ) object_size = 0 try: resp = obsClient.getObject(bucketname, objectName, loadStreamInMemory=True) if resp.status < 300: # 获取对象内容 object_size = resp.body.size print('size:', resp.body.size) else: print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: import traceback print(traceback.format_exc()) # 使用访问OBS # 关闭obsClient obsClient.close() return {"statusCode":200, "dynamic_resource_bucketName":bucketname, "dynamic_resource_objectName":objectName, "object_size":object_size}
- AK/SK通过context 上下文对象保存,需要注意的是这里的AK/SK属于函数配置中的委托,故AK/SK的权限与该委托中的授权范围相同,如果需访问OBS,用户需要对委托进行授权并配置到FunctionGraph的函数配置项中。
- 桶名以及对象名通过event进行获取。如图2所示,桶名、对象等信息保存在event中,代码中dynamic_source是event中一个特殊结构体,通过它算子提供方可以获取用户在DWR中配置的参数。这也是算子提供方获取用户配置参数的主要方式,后文将对该参数进行更加详细的介绍。bucketname以及object的值根据测试需要,点击函数测试参数配置。
- obsclient形参中的server参数为OBS的endpoint,详情参见地区和终端节点。 不同的region拥有不同的endpoint,其主要区别在于regionid,其值通过配置环境变量完成。
- 代码中最后一个return语句为必选代码块,工作流的执行过程中要求算子的返回结果为一个json体(对json中的内容无要求),否则将会发生执行异常。
父主题: 发布下载OBS对象的算子