Updated on 2024-05-16 GMT+08:00

Configuring Tencent Cloud COS to Send Synchronization Requests

Background

After you create a synchronization task, you need to configure the source object storage service to request OMS to migrate newly added and modified source objects. To do so, you can use a message notification or function service on the source platform.

Desired Effects

  • The names of newly added and modified objects in the source bucket can be obtained in real time.
  • The OMS synchronization API can be invoked to migrate the newly added and modified objects to the destination bucket.

Configuration Methods

The following describes how to use the function service on the source platform to send synchronization requests.

Procedure

  1. Log in to the Tencent Cloud Serverless Cloud Function console. In the left navigation pane, choose Function Service.
  2. Select a region at the top of the page and click Create.

    The created function must be in the same region as the source bucket.

  3. On the Create Function page, select Create from scratch.

    • Function type: Select Event-triggered function.
    • Function name: The function name is automatically populated by default and can be modified as needed.
    • Runtime environment: Choose Python 3.6

  4. Select Local ZIP file for Submitting method. Compress the index.py and signer.py files into a .zip package, and upload the package.

    • index.py
      # -*- coding: utf8 -*-
      
      
      from qcloud_cos_v5 import CosConfig
      from qcloud_cos_v5 import CosS3Client
      import requests
      import os
      import json
      import logging
      
      import signer
      
      # Tencent Cloud information
      # Do not use the source bucket as the log bucket.
      LOG_BUCKET = "***cos log bucket***" # Consists of a bucket name and APPID.
      
      # Huawei Cloud information
      #Use the AK/SK pair of an account that has only the required IAM and OMS permissions.
      HW_AK = "***Access Key***"
      HW_SK = "***Secret Access Key***"
      # The address for receiving synchronization requests. Get it from the synchronization task details.
      SYNC_URL = "***Synchronization Request Receiving Address***"
      
      
      def main_handler(event, context):
          logger = logging.getLogger()
      
          # Initialize a COS client instance.
          # Secret ID of a user. You are advised to use the member account key and grant the minimum permissions to reduce risks. For details about how to obtain the member account key, see https://www.tencentcloud.com/en/document/product/598/32675.
          secret_id = "***secret_id***"
          # Secret Key of the user. You are advised to use the member account key and grant the minimum permissions to reduce risks. For details about how to obtain the member account key, see https://www.tencentcloud.com/en/document/product/598/32675.
          secret_key = "***secret_key***"
          # Replace it with the source region. Get it from https://console.cloud.tencent.com/cos5/bucket.
          region = 'ap-beijing'  
          #Regions supported by COS. See https://www.tencentcloud.com/en/document/product/436/6224.
          token = None #If a permanent key is used, you do not need to enter the token. If a temporary key is used, you need to enter the token. For details about how to generate and use a temporary key, see https://www.tencentcloud.com/en/document/product/436/14048.
          scheme = 'https' #Specify the protocol used to access COS. The value can be http or https. The default value is https. This parameter is optional.
      
          config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
          cos_log_client = CosS3Client(config)
      
          # Parse COS events and synchronize them to OMS.
          send_oms_result = False
          success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2])
          failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2])
          evt = event["Records"][0]
          object_key = evt["cos"]["cosObject"]["key"]
          try:
              logger.info("start sync to oms!")
              resp = send_oms(object_key)
              if resp.status_code == 200:
                  logger.info("call oms api success, object_key: {0}".format(object_key))
                  record_list(logger, cos_log_client, success_prefix, object_key)
                  send_oms_result = True
              else:
                  logger.error(
                      "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content)))
                  record_list(logger, cos_log_client, failed_prefix, object_key)
          except Exception as e:
              logger.error("Internet error, call oms api failed!")
              record_list(logger, cos_log_client, failed_prefix, object_key)
          return send_oms_result
      
      
      def send_oms(object_key):
          """
          After APIG authentication succeeds, the changed object information in the COS bucket is sent to OMS. For details about API Gateway authentication, see https://support.huaweicloud.com/intl/en-us/devg-apisign/api-sign-sdk-python.html.
          :param object_key: The name of a new or modified object.
          :return: The response result.
          """
          sig = signer.Signer()
          sig.Key = HW_AK
          sig.Secret = HW_SK
      
          url = SYNC_URL
          body = json.dumps({"object_keys": [object_key]})
          r = signer.HttpRequest("POST", url)
          r.headers = {"content-type": "application/json"}
          r.body = body
          sig.Sign(r)
          resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
          return resp
      
      
      def record_list(logger, log_bucket_client, prefix, object_key):
          """
          Record the objects that are successfully migrated to or fail to be migrated to OMS.
          :param log_bucket_client: The log bucket client.
          :param prefix: Prefixes of the requested and unrequested objects.
          :param object_key: The names of requested and unrequested objects.
          :return: None
          """
          record_key = prefix + object_key
          try:
              resp = log_bucket_client.put_object(
                  Bucket=LOG_BUCKET,
                  Body=b'',
                  Key=record_key
              )
              if resp.status == 200:
                  logger.info("record success or failed object success.")
              else:
                  logger.error("record success or failed object failed, object_key: {0}".format(object_key))
          except:
              logger.error("record success or failed object failed, object_key: {0}".format(object_key))
    • signer.py
      import copy
      import sys
      import hashlib
      import hmac
      import binascii
      from datetime import datetime
      
      if sys.version_info.major < 3:
          from urllib import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest)
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      else:
          from urllib.parse import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8'))
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      
      def urlencode(s):
          return quote(s, safe='~')
      
      
      def findHeader(r, header):
          for k in r.headers:
              if k.lower() == header.lower():
                  return r.headers[k]
          return None
      
      
      # HexEncodeSHA256Hash returns hexcode of sha256
      def HexEncodeSHA256Hash(data):
          sha256 = hashlib.sha256()
          sha256.update(data)
          return sha256.hexdigest()
      
      
      # HWS API Gateway Signature
      class HttpRequest:
          def __init__(self, method="", url="", headers=None, body=""):
              self.method = method
              spl = url.split("://", 1)
              scheme = 'http'
              if len(spl) > 1:
                  scheme = spl[0]
                  url = spl[1]
              query = {}
              spl = url.split('?', 1)
              url = spl[0]
              if len(spl) > 1:
                  for kv in spl[1].split("&"):
                      spl = kv.split("=", 1)
                      key = spl[0]
                      value = ""
                      if len(spl) > 1:
                          value = spl[1]
                      if key != '':
                          key = unquote(key)
                          value = unquote(value)
                          if key in query:
                              query[key].append(value)
                          else:
                              query[key] = [value]
              spl = url.split('/', 1)
              host = spl[0]
              if len(spl) > 1:
                  url = '/' + spl[1]
              else:
                  url = '/'
      
              self.scheme = scheme
              self.host = host
              self.uri = url
              self.query = query
              if headers is None:
                  self.headers = {}
              else:
                  self.headers = copy.deepcopy(headers)
              if sys.version_info.major < 3:
                  self.body = body
              else:
                  self.body = body.encode("utf-8")
      
      
      BasicDateFormat = "%Y%m%dT%H%M%SZ"
      Algorithm = "SDK-HMAC-SHA256"
      HeaderXDate = "X-Sdk-Date"
      HeaderHost = "host"
      HeaderAuthorization = "Authorization"
      HeaderContentSha256 = "x-sdk-content-sha256"
      
      
      # Build a CanonicalRequest from a regular request string
      #
      # CanonicalRequest =
      #  HTTPRequestMethod + '\n' +
      #  CanonicalURI + '\n' +
      #  CanonicalQueryString + '\n' +
      #  CanonicalHeaders + '\n' +
      #  SignedHeaders + '\n' +
      #  HexEncode(Hash(RequestPayload))
      def CanonicalRequest(r, signedHeaders):
          canonicalHeaders = CanonicalHeaders(r, signedHeaders)
          hexencode = findHeader(r, HeaderContentSha256)
          if hexencode is None:
              hexencode = HexEncodeSHA256Hash(r.body)
          return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r),
                                             canonicalHeaders, ";".join(signedHeaders), hexencode)
      
      
      def CanonicalURI(r):
          pattens = unquote(r.uri).split('/')
          uri = []
          for v in pattens:
              uri.append(urlencode(v))
          urlpath = "/".join(uri)
          if urlpath[-1] != '/':
              urlpath = urlpath + "/"  # always end with /
          # r.uri = urlpath
          return urlpath
      
      
      def CanonicalQueryString(r):
          keys = []
          for key in r.query:
              keys.append(key)
          keys.sort()
          a = []
          for key in keys:
              k = urlencode(key)
              value = r.query[key]
              if type(value) is list:
                  value.sort()
                  for v in value:
                      kv = k + "=" + urlencode(str(v))
                      a.append(kv)
              else:
                  kv = k + "=" + urlencode(str(value))
                  a.append(kv)
          return '&'.join(a)
      
      
      def CanonicalHeaders(r, signedHeaders):
          a = []
          __headers = {}
          for key in r.headers:
              keyEncoded = key.lower()
              value = r.headers[key]
              valueEncoded = value.strip()
              __headers[keyEncoded] = valueEncoded
              if sys.version_info.major == 3:
                  r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1')
          for key in signedHeaders:
              a.append(key + ":" + __headers[key])
          return '\n'.join(a) + "\n"
      
      
      def SignedHeaders(r):
          a = []
          for key in r.headers:
              a.append(key.lower())
          a.sort()
          return a
      
      
      # Create the HWS Signature.
      def SignStringToSign(stringToSign, signingKey):
          hm = hmacsha256(signingKey, stringToSign)
          return binascii.hexlify(hm).decode()
      
      
      # Get the finalized value for the "Authorization" header.  The signature
      # parameter is the output from SignStringToSign
      def AuthHeaderValue(signature, AppKey, signedHeaders):
          return "%s Access=%s, SignedHeaders=%s, Signature=%s" % (
              Algorithm, AppKey, ";".join(signedHeaders), signature)
      
      
      class Signer:
          def __init__(self):
              self.Key = ""
              self.Secret = ""
      
          def Verify(self, r, authorization):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  return False
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              return authorization == SignStringToSign(stringToSign, self.Secret)
      
          # SignRequest set Authorization header
          def Sign(self, r):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  t = datetime.utcnow()
                  r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat)
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              haveHost = False
              for key in r.headers:
                  if key.lower() == 'host':
                      haveHost = True
                      break
              if not haveHost:
                  r.headers["host"] = r.host
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              signature = SignStringToSign(stringToSign, self.Secret)
              authValue = AuthHeaderValue(signature, self.Key, signedHeaders)
              r.headers[HeaderAuthorization] = authValue
              r.headers["content-length"] = str(len(r.body))
              queryString = CanonicalQueryString(r)
              if queryString != "":
                  r.uri = r.uri + "?" + queryString

  5. (Optional) In Advanced configuration, configure the environment, permission, layer, and network of the function as needed. For more information, see Function.
  6. Configure a trigger. Select Custom for Create trigger.

    • Trigger method: Select COS triggering.
    • COS Bucket: Select the source bucket in the synchronization task.
    • Event Type: Select All Creation Events.

  7. Click Finish. The function is created. You can view the created function on the Function Service page.
  8. Click the name of the target function. On the Function Management page, click the Function Code tab, and modify the following configuration items in index.py:

    • HW_AK = "***Access Key***"
    • HW_SK = "***Secret Access Key***"
    • SYNC_URL = "***Synchronization Request Receiving Address***"
    • secret_id = "***secret_id***"
    • secret_key = "***secret_key***"
    • region = 'ap-beijing'

  9. Select a test event (COS PUT event template or COS POST event template) and click Test.

  10. If the returned result is true, the test is successful. If the test fails, check whether the function parameters are configured correctly and whether you have required permissions.