更新时间:2025-08-11 GMT+08:00
如何在工作流中定制逻辑实现特定任务,比如时间格式转换、数组对象的push等
通过Code代码节点可以在工作流中编写Python代码,根据具体需求定制逻辑以实现特定的任务,如图1所示,更多配置介绍请参见Code代码。
时间格式转换代码示例:
# -*- coding:utf-8 -*-
import json
import base64
import datetime
"""
公共函数使用方法示例
import common
headers = {}
body = ""
data = common.httpRequest("http://localhost:3300/test", headers, body, "POST")
if data.get("code") < 300:
return data.get("body")
return "error: " + data.get("error")
接口返回res = {"headers": {},
"body": string,
"code": number,
"error": string}
"""
""""
mssiAuthData参数样例
{
"header":{}, // 连接器认证header参数
"path": {}, // 连接器认证path参数
"query":{}, // 连接器认证query参数
"body":{}, // 连接器认证body参数
"host":"https://demo.com // API主机地址
}
"""
def extractRequestParam(rawValue, encoded, defaultValue):
if encoded and rawValue:
rawValue = str(base64.b64decode(rawValue), "utf-8")
return json.loads(rawValue) if rawValue else defaultValue
## 请勿对下面的函数做修改
def handler(event, context):
"""
函数是方法的入口
:param event: 执行事件(event), 包含用户定义的函数参数以及所选择的的连接器认证相关参数
:param context: Runtime提供的函数执行上下文
:return:
"""
isBase64Encoded = event.get('isBase64Encoded', False)
inputData = extractRequestParam(event.get('body'), isBase64Encoded, {}) # 用户定义的函数参数数据
mssiAuthData = extractRequestParam(event.get('mssiAuthData'), isBase64Encoded, {}) # 连接器认证数据
mssiAuthData["securityToken"] = context.getToken()
dataExtendConfig = extractRequestParam(event.get('dataExtendConfig'), isBase64Encoded, {}) # 流步骤扩展参数
origin_time = inputData.get('time')
print(origin_time)
# 字符串转datetime
dt_obj = datetime.datetime.strptime(origin_time, '%Y-%m-%d %H:%M:%S')
# datetime转字符串
formatted_str = dt_obj.strftime('%d/%m/%Y %H:%M:%S')
result = {"formatted_time":formatted_str}
return json.dumps(result)
父主题: AI原生应用引擎
