文档首页/ 应用平台 AppStage/ 常见问题/ AI原生应用引擎/ 如何在工作流中定制逻辑实现特定任务,比如时间格式转换、数组对象的push等
更新时间:2024-12-18 GMT+08:00
分享

如何在工作流中定制逻辑实现特定任务,比如时间格式转换、数组对象的push等

工作流提供了Code代码节点,通过Code代码节点可以在工作流中编写Python代码,根据具体需求定制逻辑以实现特定的任务,如图1所示,具体介绍请参见Code代码

图1 Code代码节点配置

时间格式转换代码示例:

# -*- coding:utf-8 -*-
import json
import base64
import datetime
"""
公共函数使用方法示例
import common
headers = {}
body = ""
data = common.httpRequest("http://localhost:3300/test", headers, body, "POST")
if data.get("code") < 300:
    return data.get("body")
return "error: " + data.get("error")
接口返回res = {"headers": {},
              "body": string,
              "code": number,
              "error": string}
"""
""""
mssiAuthData参数样例
{
     "header":{}, // 连接器认证header参数
     "path": {}, // 连接器认证path参数
     "query":{}, // 连接器认证query参数
     "body":{}, // 连接器认证body参数
     "host":"https://demo.com // API主机地址
}
"""
def extractRequestParam(rawValue, encoded, defaultValue):
    if encoded and rawValue:
        rawValue = str(base64.b64decode(rawValue), "utf-8")
    return json.loads(rawValue) if rawValue else defaultValue
## 请勿对下面的函数做修改
def handler(event, context):
    """
    函数是方法的入口
    :param event: 执行事件(event), 包含用户定义的函数参数以及所选择的的连接器认证相关参数
    :param context: Runtime提供的函数执行上下文
    :return:
    """
    isBase64Encoded = event.get('isBase64Encoded', False)
    inputData = extractRequestParam(event.get('body'), isBase64Encoded, {})  # 用户定义的函数参数数据
    mssiAuthData = extractRequestParam(event.get('mssiAuthData'), isBase64Encoded, {})  # 连接器认证数据
    mssiAuthData["securityToken"] = context.getToken()
    dataExtendConfig = extractRequestParam(event.get('dataExtendConfig'), isBase64Encoded, {})  # 流步骤扩展参数
 
    origin_time = inputData.get('time')
    print(origin_time)
    # 字符串转datetime
    dt_obj = datetime.datetime.strptime(origin_time, '%Y-%m-%d %H:%M:%S')
    
    # datetime转字符串
    formatted_str = dt_obj.strftime('%d/%m/%Y %H:%M:%S')
    result = {"formatted_time":formatted_str}
    return json.dumps(result)

相关文档