更新时间:2022-04-22 GMT+08:00
分享

自定义运行时

场景说明

运行时负责运行函数的设置代码、从环境变量读取处理程序名称以及从FunctionGraph运行时API读取调用事件。运行时会将事件数据传递给函数处理程序,并将来自处理程序的响应返回给FunctionGraph。

FunctionGraph支持自定义编程语言运行时。您可以使用可执行文件(名称为bootstrap)的形式将运行时包含在函数的程序包中,当调用一个FunctionGraph函数时,它将运行函数的处理程序方法。

自定义的运行时在FunctionGraph执行环境中运行,它可以是Shell脚本,也可以是可在linux可执行的二进制文件。

在本地开发程序之后打包,必须是ZIP包(Java、Node.js、Python、Go)或者JAR包(Java),上传至FunctionGraph即可运行,无需其它的部署操作。制作ZIP包的时候,单函数入口文件必须在根目录,保证解压后,直接出现函数执行入口文件,才能正常运行。

对于Go runtime,必须在编译之后打zip包,编译后的动态库文件名称必须与函数执行入口的插件名称保持一致,例如:动态库名称为testplugin.so,则“函数执行入口”命名为testplugin.Handler。

运行时文件bootstrap说明

如果程序包中存在一个名为bootstrap的文件,FunctionGraph将执行该文件。如果引导文件未找到或不是可执行文件,函数在调用后将返回错误。

运行时代码负责完成一些初始化任务,它将在一个循环中处理调用事件,直到它被终止。

初始化任务将对函数的每个实例运行一次以准备用于处理调用的环境。

运行时接口说明

FunctionGraph提供了用于自定义运行时的HTTP API来接收来自函数的调用事件,并在FunctionGraph执行环境中发送回响应数据。

  • 获取调用

    方法 – Get

    路径 – http://$RUNTIME_API_ADDR/v1/runtime/invocation/request

    该接口用来获取下一个事件,响应正文包含事件数据。响应标头包含信息如下。

    表1 响应标头信息说明

    参数

    说明

    X-Cff-Request-Id

    请求ID。

    X-CFF-Access-Key

    租户AccessKey,使用该特殊变量需要给函数配置委托。

    X-CFF-Auth-Token

    Token,使用该特殊变量需要给函数配置委托。

    X-CFF-Invoke-Type

    函数执行类型。

    X-CFF-Secret-Key

    租户SecretKey,使用该特殊变量需要给函数配置委托。

    X-CFF-Security-Token

    Security token,使用该特殊变量需要给函数配置委托。

  • 调用响应

    方法 – POST

    路径 – http://$RUNTIME_API_ADDR/v1/runtime/invocation/response/$REQUEST_ID

    该接口将正确的调用响应发送到FunctionGraph。在运行时调用函数处理程序后,将来自函数的响应发布到调用响应路径。

  • 错误上报

    方法 – POST

    路径 – http://$RUNTIME_API_ADDR/v1/runtime/invocation/error/$REQUEST_ID

    $REQUEST_ID为获取事件的响应header中X-Cff-Request-Id变量值,说明请参考表1

    $RUNTIME_API_ADDR为系统环境变量,说明请参考表2

    该接口将错误的调用响应发送到FunctionGraph。在运行时调用函数处理程序后,将来自函数的响应发布到调用响应路径。

运行时环境变量说明

下面是FunctionGraph执行环境中运行时相关的环境变量列表,除此之外,还有用户自定义的环境变量,都可以在函数代码中直接使用。

表2 环境变量说明

值说明

RUNTIME_PROJECT_ID

projectID

RUNTIME_FUNC_NAME

函数名称

RUNTIME_FUNC_VERSION

函数的版本

RUNTIME_PACKAGE

函数组

RUNTIME_HANDLER

函数执行入口

RUNTIME_TIMEOUT

函数超时时间

RUNTIME_USERDATA

用户通过环境变量传入的值

RUNTIME_CPU

分配的CPU数

RUNTIME_MEMORY

分配的内存

RUNTIME_CODE_ROOT

包含函数代码的目录

RUNTIME_API_ADDR

自定义运行时API的主机和端口

用户定义的环境变量也同FunctionGraph环境变量一样,可通过环境变量获取方式直接获取用户定义环境变量。

示例说明

此示例包含1个文件(bootstrap文件),该文件都在Bash中实施。

运行时将从部署程序包加载函数脚本。它使用两个变量来查找脚本。

引导文件bootstrap内容如下:

#!/bin/sh

set -o pipefail

#Processing requests loop
while true
do
 
HEADERS="$(mktemp)"
  # Get an event
  EVENT_DATA=$(curl
-sS -LD "$HEADERS" -X GET
"http://$RUNTIME_API_ADDR/v1/runtime/invocation/request")
  # Get request id
from response header
  REQUEST_ID=$(grep
-Fi x-cff-request-id "$HEADERS" | tr -d '[:space:]' | cut -d: -f2)
  if [ -z
"$REQUEST_ID" ]; then
    continue
  fi
  # Process request
data
 
RESPONSE="Echoing request: '$EVENT_DATA'"
  # Put response
  curl -X POST
"http://$RUNTIME_API_ADDR/v1/runtime/invocation/response/$REQUEST_ID"
-d "$RESPONSE"
done

加载脚本后,运行时将在一个循环中处理事件。它使用运行时API从FunctionGraph检索调用事件,将事件传递到处理程序,并将响应发布回给FunctionGraph。

为了获取请求ID,运行时会将来自API响应的标头保存到临时文件,并从该文件读取x-cff-request-id读取请求头的请求唯一标识。将获取到的事件数据做处理并响应发布返回FunctionGraph。

go源码示例,需要通过编译后才可执行。

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "net"
    "net/http"
    "os"
    "strings"
    "time"
)

var (
    getRequestUrl           = os.ExpandEnv("http://${RUNTIME_API_ADDR}/v1/runtime/invocation/request")
    putResponseUrl          = os.ExpandEnv("http://${RUNTIME_API_ADDR}/v1/runtime/invocation/response/{REQUEST_ID}")
    putErrorResponseUrl     = os.ExpandEnv("http://${RUNTIME_API_ADDR}/v1/runtime/invocation/error/{REQUEST_ID}")
    requestIdInvalidError   = fmt.Errorf("request id invalid")
    noRequestAvailableError = fmt.Errorf("no request available")
    putResponseFailedError  = fmt.Errorf("put response failed")
    functionPackage         = os.Getenv("RUNTIME_PACKAGE")
    functionName            = os.Getenv("RUNTIME_FUNC_NAME")
    functionVersion         = os.Getenv("RUNTIME_FUNC_VERSION")

    client = http.Client{
        Transport: &http.Transport{
            DialContext: (&net.Dialer{
                Timeout: 3 * time.Second,
            }).DialContext,
        },
    }
)

func main() {
    // main loop for processing requests.
    for {
        requestId, header, payload, err := getRequest()
        if err != nil {
            time.Sleep(50 * time.Millisecond)
            continue
        }

        result, err := processRequestEvent(requestId, header, payload)
        err = putResponse(requestId, result, err)
        if err != nil {
            log.Printf("put response failed, err: %s.", err.Error())
        }
    }
}

// event processing function
func processRequestEvent(requestId string, header http.Header, evtBytes []byte) ([]byte, error) {
    log.Printf("processing request '%s'.", requestId)
    result := fmt.Sprintf("function: %s:%s:%s, request id: %s, headers: %+v, payload: %s", functionPackage, functionName,
        functionVersion, requestId, header, string(evtBytes))
    
    var event FunctionEvent
    err := json.Unmarshal(evtBytes, &event)
    if err != nil {
        return (&ErrorMessage{ErrorType: "invalid event", ErrorMessage: "invalid json formated event"}).toJsonBytes(), err
    }

    return (&APIGFormatResult{StatusCode: 200, Body: result}).toJsonBytes(), nil
}

func getRequest() (string, http.Header, []byte, error) {
    resp, err := client.Get(getRequestUrl)
    if err != nil {
        log.Printf("get request error, err: %s.", err.Error())
        return "", nil, nil, err
    }
    defer resp.Body.Close()

    // get request id from response header
    requestId := resp.Header.Get("X-CFF-Request-Id")
    if requestId == "" {
        log.Printf("request id not found.")
        return "", nil, nil, requestIdInvalidError
    }

    payload, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Printf("read request body error, err: %s.", err.Error())
        return "", nil, nil, err
    }

    if resp.StatusCode != 200 {
        log.Printf("get request failed, status: %d, message: %s.", resp.StatusCode, string(payload))
        return "", nil, nil, noRequestAvailableError
    }

    log.Printf("get request ok.")
    return requestId, resp.Header, payload, nil
}

func putResponse(requestId string, payload []byte, err error) error {
    var body io.Reader
    if payload != nil && len(payload) > 0 {
        body = bytes.NewBuffer(payload)
    }

    url := ""
    if err == nil {
        url = strings.Replace(putResponseUrl, "{REQUEST_ID}", requestId, -1)
    } else {
        url = strings.Replace(putErrorResponseUrl, "{REQUEST_ID}", requestId, -1)
    }

    resp, err := client.Post(strings.Replace(url, "{REQUEST_ID}", requestId, -1), "", body)
    if err != nil {
        log.Printf("put response error, err: %s.", err.Error())
        return err
    }
    defer resp.Body.Close()

    responsePayload, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Printf("read request body error, err: %s.", err.Error())
        return err
    }

    if resp.StatusCode != 200 {
        log.Printf("put response failed, status: %d, message: %s.", resp.StatusCode, string(responsePayload))
        return putResponseFailedError
    }

    return nil
}

type FunctionEvent struct {
    Type string `json:"type"`
    Name string `json:"name"`
}

type APIGFormatResult struct {
    StatusCode      int               `json:"statusCode"`
    IsBase64Encoded bool              `json:"isBase64Encoded"`
    Headers         map[string]string `json:"headers,omitempty"`
    Body            string            `json:"body,omitempty"`
}

func (result *APIGFormatResult) toJsonBytes() []byte {
    data, err := json.MarshalIndent(result, "", "  ")
    if err != nil {
        return nil
    }

    return data
}

type ErrorMessage struct {
    ErrorType     string `json:"errorType"`
    ErrorMessage  string `json:"errorMessage"`
}

func (errMsg *ErrorMessage) toJsonBytes() []byte {
    data, err := json.MarshalIndent(errMsg, "", "  ")
    if err != nil {
        return nil
    }

    return data
}

代码中的环境变量说明如下,请参考表3

表3 环境变量说明

环境变量

说明

RUNTIME_FUNC_NAME

函数名称

RUNTIME_FUNC_VERSION

函数的版本

RUNTIME_PACKAGE

函数组

分享:

    相关文档

    相关产品

close