定制运行时语言
场景说明
运行时负责运行函数的设置代码、从环境变量读取处理程序名称以及从FunctionGraph运行时API读取调用事件。运行时会将事件数据传递给函数处理程序,并将来自处理程序的响应返回给FunctionGraph。
FunctionGraph支持自定义编程语言运行时。您可以使用可执行文件(名称为bootstrap)的形式将运行时包含在函数的程序包中,当调用一个FunctionGraph函数时,它将运行函数的处理程序方法。
自定义的运行时在FunctionGraph执行环境中运行,它可以是Shell脚本,也可以是可在linux可执行的二进制文件。
在本地开发程序之后打包,必须是ZIP包(Java、Node.js、Python、Go)或者JAR包(Java),上传至FunctionGraph即可运行,无需其它的部署操作。制作ZIP包的时候,单函数入口文件必须在根目录,保证解压后,直接出现函数执行入口文件,才能正常运行。
对于Go runtime,必须在编译之后打zip包,编译后的动态库文件名称必须与函数执行入口的插件名称保持一致,例如:动态库名称为testplugin.so,则“函数执行入口”命名为testplugin.Handler。
编译说明
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build main.go
运行时文件bootstrap说明
如果程序包中存在一个名为bootstrap的文件,FunctionGraph将执行该文件。如果引导文件未找到或不是可执行文件,函数在调用后将返回错误。
运行时代码负责完成一些初始化任务,它将在一个循环中处理调用事件,直到它被终止。
初始化任务将对函数的每个实例运行一次以准备用于处理调用的环境。
运行时接口说明
FunctionGraph提供了用于自定义运行时的HTTP API来接收来自函数的调用事件,并在FunctionGraph执行环境中发送回响应数据。
- 获取调用
方法 – Get
路径 – http://$RUNTIME_API_ADDR/v1/runtime/invocation/request
该接口用来获取下一个事件,响应正文包含事件数据。响应标头包含信息如下。
- 调用响应
方法 – POST
路径 – http://$RUNTIME_API_ADDR/v1/runtime/invocation/response/$REQUEST_ID
该接口将正确的调用响应发送到FunctionGraph。在运行时调用函数处理程序后,将来自函数的响应发布到调用响应路径。
- 错误上报
方法 – POST
路径 – http://$RUNTIME_API_ADDR/v1/runtime/invocation/error/$REQUEST_ID
$REQUEST_ID为获取事件的响应header中X-Cff-Request-Id变量值,说明请参见表1。
$RUNTIME_API_ADDR为系统环境变量,说明请参见表2。
该接口将错误的调用响应发送到FunctionGraph。在运行时调用函数处理程序后,将来自函数的响应发布到调用响应路径。
运行时环境变量说明
下面是FunctionGraph执行环境中运行时相关的环境变量列表,除此之外,还有用户自定义的环境变量,都可以在函数代码中直接使用。
键 |
值说明 |
---|---|
RUNTIME_PROJECT_ID |
projectID |
RUNTIME_FUNC_NAME |
函数名称 |
RUNTIME_FUNC_VERSION |
函数的版本 |
RUNTIME_PACKAGE |
函数组 |
RUNTIME_HANDLER |
函数执行入口 |
RUNTIME_TIMEOUT |
函数超时时间 |
RUNTIME_USERDATA |
用户通过环境变量传入的值 |
RUNTIME_CPU |
分配的CPU数 |
RUNTIME_MEMORY |
分配的内存 |
RUNTIME_CODE_ROOT |
包含函数代码的目录 |
RUNTIME_API_ADDR |
自定义运行时API的主机和端口 |
用户定义的环境变量也同FunctionGraph环境变量一样,可通过环境变量获取方式直接获取用户定义环境变量。
示例说明
此示例包含1个文件(bootstrap文件),该文件都在Bash中实施。
运行时将从部署程序包加载函数脚本。它使用两个变量来查找脚本。
引导文件bootstrap内容如下:
#!/bin/sh set -o pipefail #Processing requests loop while true do HEADERS="$(mktemp)" # Get an event EVENT_DATA=$(curl -sS -LD "$HEADERS" -X GET "http://$RUNTIME_API_ADDR/v1/runtime/invocation/request") # Get request id from response header REQUEST_ID=$(grep -Fi x-cff-request-id "$HEADERS" | tr -d '[:space:]' | cut -d: -f2) if [ -z "$REQUEST_ID" ]; then continue fi # Process request data RESPONSE="Echoing request: hello world!" # Put response curl -X POST "http://$RUNTIME_API_ADDR/v1/runtime/invocation/response/$REQUEST_ID" -d "$RESPONSE" done
加载脚本后,运行时将在一个循环中处理事件。它使用运行时API从FunctionGraph检索调用事件,将事件传递到处理程序,并将响应发布回给FunctionGraph。
为了获取请求ID,运行时会将来自API响应的标头保存到临时文件,并从该文件读取x-cff-request-id读取请求头的请求唯一标识。将获取到的事件数据做处理并响应发布返回FunctionGraph。
go源码示例,需要通过编译后才可执行。
package main import ( "bytes" "encoding/json" "fmt" "io" "io/ioutil" "log" "net" "net/http" "os" "strings" "time" ) var ( getRequestUrl = os.ExpandEnv("http://${RUNTIME_API_ADDR}/v1/runtime/invocation/request") putResponseUrl = os.ExpandEnv("http://${RUNTIME_API_ADDR}/v1/runtime/invocation/response/{REQUEST_ID}") putErrorResponseUrl = os.ExpandEnv("http://${RUNTIME_API_ADDR}/v1/runtime/invocation/error/{REQUEST_ID}") requestIdInvalidError = fmt.Errorf("request id invalid") noRequestAvailableError = fmt.Errorf("no request available") putResponseFailedError = fmt.Errorf("put response failed") functionPackage = os.Getenv("RUNTIME_PACKAGE") functionName = os.Getenv("RUNTIME_FUNC_NAME") functionVersion = os.Getenv("RUNTIME_FUNC_VERSION") client = http.Client{ Transport: &http.Transport{ DialContext: (&net.Dialer{ Timeout: 3 * time.Second, }).DialContext, }, } ) func main() { // main loop for processing requests. for { requestId, header, payload, err := getRequest() if err != nil { time.Sleep(50 * time.Millisecond) continue } result, err := processRequestEvent(requestId, header, payload) err = putResponse(requestId, result, err) if err != nil { log.Printf("put response failed, err: %s.", err.Error()) } } } // event processing function func processRequestEvent(requestId string, header http.Header, evtBytes []byte) ([]byte, error) { log.Printf("processing request '%s'.", requestId) result := fmt.Sprintf("function: %s:%s:%s, request id: %s, headers: %+v, payload: %s", functionPackage, functionName, functionVersion, requestId, header, string(evtBytes)) var event FunctionEvent err := json.Unmarshal(evtBytes, &event) if err != nil { return (&ErrorMessage{ErrorType: "invalid event", ErrorMessage: "invalid json formated event"}).toJsonBytes(), err } return (&APIGFormatResult{StatusCode: 200, Body: result}).toJsonBytes(), nil } func getRequest() (string, http.Header, []byte, error) { resp, err := client.Get(getRequestUrl) if err != nil { log.Printf("get request error, err: %s.", err.Error()) return "", nil, nil, err } defer resp.Body.Close() // get request id from response header requestId := resp.Header.Get("X-CFF-Request-Id") if requestId == "" { log.Printf("request id not found.") return "", nil, nil, requestIdInvalidError } payload, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("read request body error, err: %s.", err.Error()) return "", nil, nil, err } if resp.StatusCode != 200 { log.Printf("get request failed, status: %d, message: %s.", resp.StatusCode, string(payload)) return "", nil, nil, noRequestAvailableError } log.Printf("get request ok.") return requestId, resp.Header, payload, nil } func putResponse(requestId string, payload []byte, err error) error { var body io.Reader if payload != nil && len(payload) > 0 { body = bytes.NewBuffer(payload) } url := "" if err == nil { url = strings.Replace(putResponseUrl, "{REQUEST_ID}", requestId, -1) } else { url = strings.Replace(putErrorResponseUrl, "{REQUEST_ID}", requestId, -1) } resp, err := client.Post(strings.Replace(url, "{REQUEST_ID}", requestId, -1), "", body) if err != nil { log.Printf("put response error, err: %s.", err.Error()) return err } defer resp.Body.Close() responsePayload, err := ioutil.ReadAll(resp.Body) if err != nil { log.Printf("read request body error, err: %s.", err.Error()) return err } if resp.StatusCode != 200 { log.Printf("put response failed, status: %d, message: %s.", resp.StatusCode, string(responsePayload)) return putResponseFailedError } return nil } type FunctionEvent struct { Type string `json:"type"` Name string `json:"name"` } type APIGFormatResult struct { StatusCode int `json:"statusCode"` IsBase64Encoded bool `json:"isBase64Encoded"` Headers map[string]string `json:"headers,omitempty"` Body string `json:"body,omitempty"` } func (result *APIGFormatResult) toJsonBytes() []byte { data, err := json.MarshalIndent(result, "", " ") if err != nil { return nil } return data } type ErrorMessage struct { ErrorType string `json:"errorType"` ErrorMessage string `json:"errorMessage"` } func (errMsg *ErrorMessage) toJsonBytes() []byte { data, err := json.MarshalIndent(errMsg, "", " ") if err != nil { return nil } return data }
代码中的环境变量说明如下,请参见表3。
支持流式返回
自定义运行时支持在函数中返回超过6MB并且不超过200MB的大报文,以数据流的方式返回。
- 登录函数工作流控制台,单击自定义运行时的函数名称进入详情页。
- 选择“设置 > 高级配置”,开启“流式返回”。