更新时间:2023-04-25 GMT+08:00
分享

开发算子代码

DWR算子开发运行依赖于FunctionGraph平台,本节将使用python开发一个下载OBS对象的算子,python使用详情参见Python函数开发指南。通过对该示例算子开发方式的说明,能够帮助算子开发者快速适应开发流程。通过本节将学到以下内容:
  • 算子代码的入口结构
  • 如何在算子中获得AK/SK
  • 如何通过算子访问OBS

操作步骤

  1. 登录FunctionGraph控制台创建python函数,创建详情参见创建事件函数

    1. 函数类型选择“事件函数”,该类型函数可以被事件触发,在DWR的使用过程中为OBS的事件,例如上传对象事件,下载对象事件等。
    2. 代码上传方式选择“默认代码”。创建成功后FunctionGraph平台会自动生成一个python代码。
    图1 创建函数界面

  2. 编辑python代码。

    图2所示,默认代码中只有一个handler函数,入参为两个:
    • event :触发事件信息,OBS事件触发的函数(DWR中称为算子)中event包含桶名,对象名。event也是自定义信息的承载体,DWR用户输入内容通过event中的动态参数(dynamic_source)保存。
    • context :函数的上下文信息,例如函数执行时委托的AK/SK,Token等。更多内容可参考FunctionGraph开发文档
    图2 FunctionGraph编辑代码界面

  3. 测试代码

    1. 测试代码需要配置测试事件,在下拉框中选择“配置测试事件”,进入配置测试事件界面。
      图3 配置测试事件
    2. 选择“创建新的测试事件”并选择“空白模板”,将原有内容替换为以下事件(json格式)内容。
      图4 创建新的测试事件
      {
          "execution_name": "84a3dd2bd67f43aa9b98cdd74604ca68",
          "graph_name": "test_workflow",
          "Records": [
              {
                  "eventName": "ObjectCreated:Put",
                  "eventRegion": "cn-north-4",
                  "eventSource": "OBS",
                  "eventTime": "2021-12-23T14:50:22.957Z",
                  "eventVersion": "3.0",
                  "obs": {
                      "Version": "1.0",
                      "bucket": {
                          "bucket": "examplebucket",
                          "name": "examplebucket",
                          "ownerIdentity": {
                              "ID": "08b4efe0fc00d3ce0f17c01b948f6e80"
                          }
                      },
                      "configurationId": "test-trigger",
                      "object": {
                          "eTag": "fc85a07cff68977bf5b2108e7436ca2d",
                          "key": "exampleobject.docx",
                          "oldpsxpth": "",
                          "sequencer": "1",
                          "size": "524298",
                          "versionId": "G001017DE60E176D0000401106696610null"
                      }
                  },
                  "requestParameters": {
                      "sourceIPAddress": "x.x.x.x"
                  },
                  "responseElements": {
                      "x-obs-id-2": "",
                      "x-obs-request-id": "84a3dd2bd67f43aa9b98cdd74604ca68"
                  },
                  "userIdentity": {
                      "ID": "08b4efe0fc00d3ce0f17c01b948f6e80"
                  }
              }
          ],
          "inputs": {
              "parametername": "parametervalue"
          },
          "dynamic_source": {
              "parametername": "parametervalue",
      	"bucketname": "inputBucketName",
              "object": "inputObject"
          }
      }
    3. 点击“保存”后,即可点击“测试”按钮查看运行结果。

      可以在“执行结果”中看到保存的事件信息完整的打印在了参数body中,此函数已具备接收OBS事件的能力,可作为算子在DWR中运行。接下来我们将在其中添加“下载OBS对象功能”让函数功能更加丰富。

      图5 测试结果

  4. 添加OBS下载逻辑(涉及AK/SK获取)

    下载OBS的方式参考下载对象,示例采用二进制下载方式,主要代码如下所示。

    图6所示,执行结果中已经获取到目标对象的大小 size: 292685。
    # -*- coding:utf-8 -*-
    import json
    # 引入模块
    from obs import ObsClient
    import os
    
    def handler (event, context):
        # 获取AK/SK
        ak = context.getAccessKey()
        sk = context.getSecretKey()
        regionid = os.getenv('region')
        # dynamic_source是event中一个特殊结构体,通过它算子提供方可以获取用户在DWR中配置的参数
        bucketname = event["dynamic_source"]["bucketname"]
        objectName = event["dynamic_source"]["object"]
        print("ak : {}, sk :  {}".format(ak, sk)  )
    
        print("bucket name in dynamic_source is  : {}".format(bucketname)  )
        # 创建ObsClient实例
        obsClient = ObsClient(
            access_key_id=ak,    
            secret_access_key=sk,    
            server='https://obs.{}.huawei.com'.format(regionid)
        )
        object_size = 0
        try:
            resp = obsClient.getObject(bucketname, objectName, loadStreamInMemory=True) 
    
            if resp.status < 300: 
                # 获取对象内容 
                object_size = resp.body.size
                print('size:', resp.body.size) 
            else: 
                print('errorCode:', resp.errorCode) 
                print('errorMessage:', resp.errorMessage)
        except:
            import traceback
            print(traceback.format_exc())
        # 使用访问OBS
    
        # 关闭obsClient
        obsClient.close()
    
        return {"statusCode":200,
                "dynamic_resource_bucketName":bucketname,
                "dynamic_resource_objectName":objectName,
                "object_size":object_size}
    图6 OBS下载对象流程执行结果
    1. AK/SK通过context 上下文对象保存,需要注意的是这里的AK/SK属于函数配置中的委托,故AK/SK的权限与该委托中的授权范围相同,如果需访问OBS,用户需要对委托进行授权并配置到FunctionGraph的函数配置项中。
    2. 桶名以及对象名通过event进行获取。如图2所示,桶名、对象等信息保存在event中,代码中dynamic_source是event中一个特殊结构体,通过它算子提供方可以获取用户在DWR中配置的参数。这也是算子提供方获取用户配置参数的主要方式,后文将对该参数进行更加详细的介绍。bucketname以及object的值根据测试需要,点击函数测试参数配置。
    3. obsclient形参中的server参数为OBS的endpoint,详情参见地区和终端节点。 不同的region拥有不同的endpoint,其主要区别在于regionid,其值通过配置环境变量完成。
    4. 代码中最后一个return语句为必选代码块,工作流的执行过程中要求算子的返回结果为一个json体(对json中的内容无要求),否则将会发生执行异常。

相关文档