更新时间:2024-10-25 GMT+08:00

阿里云OSS配置同步请求

背景信息

创建同步任务后,您需要在源端配置同步请求,以确保源端云服务商的对象存储服务可以针对源端新增、修改对象实时调用OMS同步接口(例如,通过消息通知或函数计算服务方式),以完成对源端新增、修改对象数据的同步迁移。

配置条件

  • 实时获取源端新增、修改对象名称。
  • 调用OMS同步接口,将源端变化对象传递给对象存储迁移服务 OMS,从而完成源端同步迁移。

配置方式

基于源端云服务商的对象存储服务和函数工作流服务,通过配置源端云服务商的对象存储服务和函数工作流服务来触发新增、修改对象的同步请求。

操作步骤

  1. 登录阿里云函数计算控制台,在左侧导航栏,单击函数
  2. 在顶部菜单栏,选择地域,然后在函数页面,单击创建函数

    创建的函数需要与源端桶在同一区域。

  3. 创建函数页面,选择事件函数,配置以下参数,然后单击创建

    表1 参数配置说明

    参数

    配置说明

    函数名称

    用户自定义。

    运行环境

    选择Python3

    代码上传方式

    选择通过zip包上传代码。将以下两个代码文件index.pysigner.py打包为至一个zip包并上传。

    • index.py
      # -*- coding: utf-8 -*-
      import logging
      import json
      
      import requests
      import oss2
      import signer
      
      # 阿里云侧信息
      # 日志桶不可以是源端桶
      LOG_BUCKET = "***oss log bucket***"
      OSS_ENDPOINT = "***oss endpoint***"
      
      # 华为云侧信息
      # 请使低权限ak/sk,该ak/sk仅需要iam/oms权限
      HW_AK = "***Access Key***"
      HW_SK = "***Secret Access Key***"
      # 同步请求接收地址,见同步任务详情
      SYNC_URL = "***Synchronization Request Receiving Address***"
      
      
      def handler(event, context):
          logger = logging.getLogger()
      
          # 初始化oss授权
          creds = context.credentials
          auth = oss2.StsAuth(creds.access_key_id, creds.access_key_secret, creds.security_token)
          oss_client = oss2.Bucket(auth, OSS_ENDPOINT, LOG_BUCKET)
      
          # 解析oss事件同步到oms服务
          send_oms_result = False
          success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2])
          failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2])
          evt = json.loads(event)["events"][0]
          object_key = evt["oss"]["object"]["key"]
          try:
              logger.info("start sync to oms!")
              resp = send_oms(object_key)
              if resp.status_code == 200:
                  logger.info("call oms api success, object_key: {0}".format(object_key))
                  record_list(logger, oss_client, success_prefix, object_key)
                  send_oms_result = True
              else:
                  logger.error(
                      "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content)))
                  record_list(logger, oss_client, failed_prefix, object_key)
          except:
              logger.error("Internet error, call oms api failed!")
              record_list(logger, oss_client, failed_prefix, object_key)
          return send_oms_result
      
      
      def send_oms(object_key):
          """
          apig鉴权后,将oss桶内变更对象信息发送给oms服务,apig鉴权具体方法参考:(https://support.huaweicloud.com/intl/zh-cn/devg-apisign/api-sign-sdk-python.html)
          :param object_key: 源端新增、修改对象名称
          :return: response对象
          """
          sig = signer.Signer()
          sig.Key = HW_AK
          sig.Secret = HW_SK
      
          url = SYNC_URL
          body = json.dumps({"object_keys": [object_key]})
          r = signer.HttpRequest("POST", url)
          r.headers = {"content-type": "application/json"}
          r.body = body
          sig.Sign(r)
          resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
          return resp
      
      
      def record_list(logger, log_bucket_client, prefix, object_key):
          """
          记录发送到oms成功和失败的对象.
          :param log_bucket_client: 日志桶客户端
          :param prefix: 日志前缀
          :param object_key: 成功或失败对象名称
          :return: None
          """
          record_key = prefix + object_key
          try:
              resp = log_bucket_client.put_object(record_key, "")
              if resp.status == 200:
                  logger.info("record success or failed object success.")
              else:
                  logger.error("record success or failed object failed, object_key: {0}".format(object_key))
          except:
              logger.error("record success or failed object failed, object_key: {0}".format(object_key))
      
    • signer.py
      import copy
      import sys
      import hashlib
      import hmac
      import binascii
      from datetime import datetime
      
      if sys.version_info.major < 3:
          from urllib import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest)
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      else:
          from urllib.parse import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8'))
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      
      def urlencode(s):
          return quote(s, safe='~')
      
      
      def findHeader(r, header):
          for k in r.headers:
              if k.lower() == header.lower():
                  return r.headers[k]
          return None
      
      
      # HexEncodeSHA256Hash returns hexcode of sha256
      def HexEncodeSHA256Hash(data):
          sha256 = hashlib.sha256()
          sha256.update(data)
          return sha256.hexdigest()
      
      
      # HWS API Gateway Signature
      class HttpRequest:
          def __init__(self, method="", url="", headers=None, body=""):
              self.method = method
              spl = url.split("://", 1)
              scheme = 'http'
              if len(spl) > 1:
                  scheme = spl[0]
                  url = spl[1]
              query = {}
              spl = url.split('?', 1)
              url = spl[0]
              if len(spl) > 1:
                  for kv in spl[1].split("&"):
                      spl = kv.split("=", 1)
                      key = spl[0]
                      value = ""
                      if len(spl) > 1:
                          value = spl[1]
                      if key != '':
                          key = unquote(key)
                          value = unquote(value)
                          if key in query:
                              query[key].append(value)
                          else:
                              query[key] = [value]
              spl = url.split('/', 1)
              host = spl[0]
              if len(spl) > 1:
                  url = '/' + spl[1]
              else:
                  url = '/'
      
              self.scheme = scheme
              self.host = host
              self.uri = url
              self.query = query
              if headers is None:
                  self.headers = {}
              else:
                  self.headers = copy.deepcopy(headers)
              if sys.version_info.major < 3:
                  self.body = body
              else:
                  self.body = body.encode("utf-8")
      
      
      BasicDateFormat = "%Y%m%dT%H%M%SZ"
      Algorithm = "SDK-HMAC-SHA256"
      HeaderXDate = "X-Sdk-Date"
      HeaderHost = "host"
      HeaderAuthorization = "Authorization"
      HeaderContentSha256 = "x-sdk-content-sha256"
      
      
      # Build a CanonicalRequest from a regular request string
      #
      # CanonicalRequest =
      #  HTTPRequestMethod + '\n' +
      #  CanonicalURI + '\n' +
      #  CanonicalQueryString + '\n' +
      #  CanonicalHeaders + '\n' +
      #  SignedHeaders + '\n' +
      #  HexEncode(Hash(RequestPayload))
      def CanonicalRequest(r, signedHeaders):
          canonicalHeaders = CanonicalHeaders(r, signedHeaders)
          hexencode = findHeader(r, HeaderContentSha256)
          if hexencode is None:
              hexencode = HexEncodeSHA256Hash(r.body)
          return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r),
                                             canonicalHeaders, ";".join(signedHeaders), hexencode)
      
      
      def CanonicalURI(r):
          pattens = unquote(r.uri).split('/')
          uri = []
          for v in pattens:
              uri.append(urlencode(v))
          urlpath = "/".join(uri)
          if urlpath[-1] != '/':
              urlpath = urlpath + "/"  # always end with /
          # r.uri = urlpath
          return urlpath
      
      
      def CanonicalQueryString(r):
          keys = []
          for key in r.query:
              keys.append(key)
          keys.sort()
          a = []
          for key in keys:
              k = urlencode(key)
              value = r.query[key]
              if type(value) is list:
                  value.sort()
                  for v in value:
                      kv = k + "=" + urlencode(str(v))
                      a.append(kv)
              else:
                  kv = k + "=" + urlencode(str(value))
                  a.append(kv)
          return '&'.join(a)
      
      
      def CanonicalHeaders(r, signedHeaders):
          a = []
          __headers = {}
          for key in r.headers:
              keyEncoded = key.lower()
              value = r.headers[key]
              valueEncoded = value.strip()
              __headers[keyEncoded] = valueEncoded
              if sys.version_info.major == 3:
                  r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1')
          for key in signedHeaders:
              a.append(key + ":" + __headers[key])
          return '\n'.join(a) + "\n"
      
      
      def SignedHeaders(r):
          a = []
          for key in r.headers:
              a.append(key.lower())
          a.sort()
          return a
      
      
      # Create the HWS Signature.
      def SignStringToSign(stringToSign, signingKey):
          hm = hmacsha256(signingKey, stringToSign)
          return binascii.hexlify(hm).decode()
      
      
      # Get the finalized value for the "Authorization" header.  The signature
      # parameter is the output from SignStringToSign
      def AuthHeaderValue(signature, AppKey, signedHeaders):
          return "%s Access=%s, SignedHeaders=%s, Signature=%s" % (
              Algorithm, AppKey, ";".join(signedHeaders), signature)
      
      
      class Signer:
          def __init__(self):
              self.Key = ""
              self.Secret = ""
      
          def Verify(self, r, authorization):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  return False
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              return authorization == SignStringToSign(stringToSign, self.Secret)
      
          # SignRequest set Authorization header
          def Sign(self, r):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  t = datetime.utcnow()
                  r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat)
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              haveHost = False
              for key in r.headers:
                  if key.lower() == 'host':
                      haveHost = True
                      break
              if not haveHost:
                  r.headers["host"] = r.host
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              signature = SignStringToSign(stringToSign, self.Secret)
              authValue = AuthHeaderValue(signature, self.Key, signedHeaders)
              r.headers[HeaderAuthorization] = authValue
              r.headers["content-length"] = str(len(r.body))
              queryString = CanonicalQueryString(r)
              if queryString != "":
                  r.uri = r.uri + "?" + queryString
      

  4. 函数创建成功后进入函数详情页,在代码页签修改index.py中的如下配置项。

    • LOG_BUCKET = "***oss log bucket***"
    • OSS_ENDPOINT = "***oss endpoint***"
    • HW_AK = "***Access Key***"
    • HW_SK = "***Secret Access Key***"
    • SYNC_URL = "***Synchronization Request Receiving Address***"

  5. 配置页签,左侧选择触发器,单击“创建触发器”,参考表2,创建OSS事件触发器并单击“确定”保存。

    表2 触发器参数配置说明

    参数

    配置说明

    触发器类型

    选择对象存储 OSS

    名称

    用户自定义。

    版本和别名

    保持默认即可。

    Bucket 名称

    选择源端桶,该桶要与同步任务所选源端桶保持一致。

    触发事件

    保持默认即可。

    角色名称

    保持默认即可。

  6. OSS触发器创建完成后,选择测试页签,事件模板选择“对象存储 OSS”,自定义事件名称,单击“测试函数”执行测试,执行结果为true表示成功。若调用失败,请检查参数配置和函数计算权限配置。