更新时间:2024-10-25 GMT+08:00

腾讯云COS配置同步请求

背景信息

创建同步任务后,您需要在源端配置同步请求,以确保源端云服务商的对象存储服务可以针对源端新增、修改对象实时调用OMS同步接口(例如,通过消息通知或函数计算服务方式),以完成对源端新增、修改对象数据的同步迁移。

配置条件

  • 实时获取源端新增、修改对象名称。
  • 调用OMS同步接口,将源端变化对象传递给对象存储迁移服务 OMS,从而完成源端同步迁移。

配置方式

基于源端云服务商的对象存储服务和函数工作流服务,通过配置源端云服务商的对象存储服务和函数工作流服务来触发新增、修改对象的同步请求。

操作步骤

  1. 登录腾讯云函数服务 Serverless 控制台,在左侧导航栏,单击函数服务
  2. 在页面上方选择地域,单击新建,进入新建页面。

    创建的函数需要与源端桶在同一区域。

  3. 新建函数页面,选择“从头开始”。

    • 函数类型:选择“事件函数
    • 函数名称:默认填充,可根据需要自行修改。
    • 运行环境:选择Python3.6。

  4. 函数代码提交方式选择“本地上传zip包”,将以下两个代码文件index.pysigner.py打包为至一个zip包并上传。

    • index.py
      # -*- coding: utf8 -*-
      
      
      from qcloud_cos_v5 import CosConfig
      from qcloud_cos_v5 import CosS3Client
      import requests
      import os
      import json
      import logging
      
      import signer
      
      # 腾讯云侧信息
      # 日志桶不可以是源端桶
      LOG_BUCKET = "***cos log bucket***"  # 由 BucketName-APPID 构成
      
      # 华为云侧信息
      # 请使低权限ak/sk,该ak/sk仅需要iam/oms权限
      HW_AK = "***Access Key***"
      HW_SK = "***Secret Access Key***"
      # 同步请求接收地址,见同步任务详情
      SYNC_URL = "***Synchronization Request Receiving Address***"
      
      
      def main_handler(event, context):
          logger = logging.getLogger()
      
          # 初始化cos授权
          # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://www.tencentcloud.com/zh/document/product/598/32675
          secret_id = "***secret_id***"
          # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://www.tencentcloud.com/zh/document/product/598/32675
          secret_key = "***secret_key***"
          # 替换为用户的 region,已创建桶归属的region可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket
          region = 'ap-beijing'  
          # COS支持的所有region列表参见https://www.tencentcloud.com/zh/document/product/436/6224
          token = None  # 如果使用永久密钥不需要填入token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见https://www.tencentcloud.com/zh/document/product/436/14048
          scheme = 'https'  # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
      
          config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
          cos_log_client = CosS3Client(config)
      
          # 解析cos事件同步到oms服务
          send_oms_result = False
          success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2])
          failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2])
          evt = event["Records"][0]
          object_key = evt["cos"]["cosObject"]["key"]
          try:
              logger.info("start sync to oms!")
              resp = send_oms(object_key)
              if resp.status_code == 200:
                  logger.info("call oms api success, object_key: {0}".format(object_key))
                  record_list(logger, cos_log_client, success_prefix, object_key)
                  send_oms_result = True
              else:
                  logger.error(
                      "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content)))
                  record_list(logger, cos_log_client, failed_prefix, object_key)
          except Exception as e:
              logger.error("Internet error, call oms api failed!")
              record_list(logger, cos_log_client, failed_prefix, object_key)
          return send_oms_result
      
      
      def send_oms(object_key):
          """
          apig鉴权后,将cos桶内变更对象信息发送给oms服务,apig鉴权具体方法参考:(https://support.huaweicloud.com/intl/zh-cn/devg-apisign/api-sign-sdk-python.html)
          :param object_key: 源端新增、修改对象名称
          :return: response对象
          """
          sig = signer.Signer()
          sig.Key = HW_AK
          sig.Secret = HW_SK
      
          url = SYNC_URL
          body = json.dumps({"object_keys": [object_key]})
          r = signer.HttpRequest("POST", url)
          r.headers = {"content-type": "application/json"}
          r.body = body
          sig.Sign(r)
          resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
          return resp
      
      
      def record_list(logger, log_bucket_client, prefix, object_key):
          """
          记录发送到oms成功和失败的对象.
          :param log_bucket_client: 日志桶客户端
          :param prefix: 日志前缀
          :param object_key: 成功或失败对象名称
          :return: None
          """
          record_key = prefix + object_key
          try:
              resp = log_bucket_client.put_object(
                  Bucket=LOG_BUCKET,
                  Body=b'',
                  Key=record_key
              )
              if resp.status == 200:
                  logger.info("record success or failed object success.")
              else:
                  logger.error("record success or failed object failed, object_key: {0}".format(object_key))
          except:
              logger.error("record success or failed object failed, object_key: {0}".format(object_key))
    • signer.py
      import copy
      import sys
      import hashlib
      import hmac
      import binascii
      from datetime import datetime
      
      if sys.version_info.major < 3:
          from urllib import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest)
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      else:
          from urllib.parse import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8'))
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      
      def urlencode(s):
          return quote(s, safe='~')
      
      
      def findHeader(r, header):
          for k in r.headers:
              if k.lower() == header.lower():
                  return r.headers[k]
          return None
      
      
      # HexEncodeSHA256Hash returns hexcode of sha256
      def HexEncodeSHA256Hash(data):
          sha256 = hashlib.sha256()
          sha256.update(data)
          return sha256.hexdigest()
      
      
      # HWS API Gateway Signature
      class HttpRequest:
          def __init__(self, method="", url="", headers=None, body=""):
              self.method = method
              spl = url.split("://", 1)
              scheme = 'http'
              if len(spl) > 1:
                  scheme = spl[0]
                  url = spl[1]
              query = {}
              spl = url.split('?', 1)
              url = spl[0]
              if len(spl) > 1:
                  for kv in spl[1].split("&"):
                      spl = kv.split("=", 1)
                      key = spl[0]
                      value = ""
                      if len(spl) > 1:
                          value = spl[1]
                      if key != '':
                          key = unquote(key)
                          value = unquote(value)
                          if key in query:
                              query[key].append(value)
                          else:
                              query[key] = [value]
              spl = url.split('/', 1)
              host = spl[0]
              if len(spl) > 1:
                  url = '/' + spl[1]
              else:
                  url = '/'
      
              self.scheme = scheme
              self.host = host
              self.uri = url
              self.query = query
              if headers is None:
                  self.headers = {}
              else:
                  self.headers = copy.deepcopy(headers)
              if sys.version_info.major < 3:
                  self.body = body
              else:
                  self.body = body.encode("utf-8")
      
      
      BasicDateFormat = "%Y%m%dT%H%M%SZ"
      Algorithm = "SDK-HMAC-SHA256"
      HeaderXDate = "X-Sdk-Date"
      HeaderHost = "host"
      HeaderAuthorization = "Authorization"
      HeaderContentSha256 = "x-sdk-content-sha256"
      
      
      # Build a CanonicalRequest from a regular request string
      #
      # CanonicalRequest =
      #  HTTPRequestMethod + '\n' +
      #  CanonicalURI + '\n' +
      #  CanonicalQueryString + '\n' +
      #  CanonicalHeaders + '\n' +
      #  SignedHeaders + '\n' +
      #  HexEncode(Hash(RequestPayload))
      def CanonicalRequest(r, signedHeaders):
          canonicalHeaders = CanonicalHeaders(r, signedHeaders)
          hexencode = findHeader(r, HeaderContentSha256)
          if hexencode is None:
              hexencode = HexEncodeSHA256Hash(r.body)
          return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r),
                                             canonicalHeaders, ";".join(signedHeaders), hexencode)
      
      
      def CanonicalURI(r):
          pattens = unquote(r.uri).split('/')
          uri = []
          for v in pattens:
              uri.append(urlencode(v))
          urlpath = "/".join(uri)
          if urlpath[-1] != '/':
              urlpath = urlpath + "/"  # always end with /
          # r.uri = urlpath
          return urlpath
      
      
      def CanonicalQueryString(r):
          keys = []
          for key in r.query:
              keys.append(key)
          keys.sort()
          a = []
          for key in keys:
              k = urlencode(key)
              value = r.query[key]
              if type(value) is list:
                  value.sort()
                  for v in value:
                      kv = k + "=" + urlencode(str(v))
                      a.append(kv)
              else:
                  kv = k + "=" + urlencode(str(value))
                  a.append(kv)
          return '&'.join(a)
      
      
      def CanonicalHeaders(r, signedHeaders):
          a = []
          __headers = {}
          for key in r.headers:
              keyEncoded = key.lower()
              value = r.headers[key]
              valueEncoded = value.strip()
              __headers[keyEncoded] = valueEncoded
              if sys.version_info.major == 3:
                  r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1')
          for key in signedHeaders:
              a.append(key + ":" + __headers[key])
          return '\n'.join(a) + "\n"
      
      
      def SignedHeaders(r):
          a = []
          for key in r.headers:
              a.append(key.lower())
          a.sort()
          return a
      
      
      # Create the HWS Signature.
      def SignStringToSign(stringToSign, signingKey):
          hm = hmacsha256(signingKey, stringToSign)
          return binascii.hexlify(hm).decode()
      
      
      # Get the finalized value for the "Authorization" header.  The signature
      # parameter is the output from SignStringToSign
      def AuthHeaderValue(signature, AppKey, signedHeaders):
          return "%s Access=%s, SignedHeaders=%s, Signature=%s" % (
              Algorithm, AppKey, ";".join(signedHeaders), signature)
      
      
      class Signer:
          def __init__(self):
              self.Key = ""
              self.Secret = ""
      
          def Verify(self, r, authorization):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  return False
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              return authorization == SignStringToSign(stringToSign, self.Secret)
      
          # SignRequest set Authorization header
          def Sign(self, r):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  t = datetime.utcnow()
                  r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat)
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              haveHost = False
              for key in r.headers:
                  if key.lower() == 'host':
                      haveHost = True
                      break
              if not haveHost:
                  r.headers["host"] = r.host
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              signature = SignStringToSign(stringToSign, self.Secret)
              authValue = AuthHeaderValue(signature, self.Key, signedHeaders)
              r.headers[HeaderAuthorization] = authValue
              r.headers["content-length"] = str(len(r.body))
              queryString = CanonicalQueryString(r)
              if queryString != "":
                  r.uri = r.uri + "?" + queryString

  5. (可选)展开“高级配置”,您可以根据实际需求修改环境配置、权限配置、层配置、网络配置等,详情参见函数相关配置
  6. 配置触发器,在“创建触发器”中选择自定义创建

    • 触发方式:选择“COS触发”。
    • COS Bucket:选择同步任务的源端桶。
    • 事件类型:选择“全部创建”。

  7. 单击“完成”,创建函数完成,您可以在函数服务页面查看已创建的函数。
  8. 单击函数名称,进入“函数管理”页面,选择函数代码页签,修改index.py中的以下配置项。

    • HW_AK = "***Access Key***"
    • HW_SK = "***Secret Access Key***"
    • SYNC_URL = "***Synchronization Request Receiving Address***"
    • secret_id = "***secret_id***"
    • secret_key = "***secret_key***"
    • region = 'ap-beijing'

  9. 选择测试事件(COS 对象存储的 PUT 事件模板、COS 对象存储的 POST 事件模板),单击“测试”。

  10. 在返回结果为true表示测试成功。若测试失败,请检查参数配置和云函数权限配置。