Configuring Tencent Cloud COS to Send Synchronization Requests
Background
After you create a synchronization task, you need to configure the source object storage service to request OMS to migrate newly added and modified source objects. To do so, you can use a message notification or function service on the source platform.
Desired Effects
- The names of newly added and modified objects in the source bucket can be obtained in real time.
- The OMS synchronization API can be invoked to migrate the newly added and modified objects to the destination bucket.
Configuration Methods
The following describes how to use the function service on the source platform to send synchronization requests.
Procedure
- Log in to the Tencent Cloud Serverless Cloud Function console. In the left navigation pane, choose Function Service.
- Select a region at the top of the page and click Create.
The created function must be in the same region as the source bucket.
- On the Create Function page, select Create from scratch.
- Function type: Select Event-triggered function.
- Function name: The function name is automatically populated by default and can be modified as needed.
- Runtime environment: Choose Python 3.6
- Select Local ZIP file for Submitting method. Compress the index.py and signer.py files into a .zip package, and upload the package.
- index.py
# -*- coding: utf8 -*- from qcloud_cos_v5 import CosConfig from qcloud_cos_v5 import CosS3Client import requests import os import json import logging import signer # Tencent Cloud information # Do not use the source bucket as the log bucket. LOG_BUCKET = "***cos log bucket***" # Consists of a bucket name and APPID. # Huawei Cloud information #Use the AK/SK pair of an account that has only the required IAM and OMS permissions. HW_AK = "***Access Key***" HW_SK = "***Secret Access Key***" # The address for receiving synchronization requests. Get it from the synchronization task details. SYNC_URL = "***Synchronization Request Receiving Address***" def main_handler(event, context): logger = logging.getLogger() # Initialize a COS client instance. # Secret ID of a user. You are advised to use the member account key and grant the minimum permissions to reduce risks. For details about how to obtain the member account key, see https://www.tencentcloud.com/en/document/product/598/32675. secret_id = "***secret_id***" # Secret Key of the user. You are advised to use the member account key and grant the minimum permissions to reduce risks. For details about how to obtain the member account key, see https://www.tencentcloud.com/en/document/product/598/32675. secret_key = "***secret_key***" # Replace it with the source region. Get it from https://console.cloud.tencent.com/cos5/bucket. region = 'ap-beijing' #Regions supported by COS. See https://www.tencentcloud.com/en/document/product/436/6224. token = None #If a permanent key is used, you do not need to enter the token. If a temporary key is used, you need to enter the token. For details about how to generate and use a temporary key, see https://www.tencentcloud.com/en/document/product/436/14048. scheme = 'https' #Specify the protocol used to access COS. The value can be http or https. The default value is https. This parameter is optional. config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme) cos_log_client = CosS3Client(config) # Parse COS events and synchronize them to OMS. send_oms_result = False success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2]) failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2]) evt = event["Records"][0] object_key = evt["cos"]["cosObject"]["key"] try: logger.info("start sync to oms!") resp = send_oms(object_key) if resp.status_code == 200: logger.info("call oms api success, object_key: {0}".format(object_key)) record_list(logger, cos_log_client, success_prefix, object_key) send_oms_result = True else: logger.error( "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content))) record_list(logger, cos_log_client, failed_prefix, object_key) except Exception as e: logger.error("Internet error, call oms api failed!") record_list(logger, cos_log_client, failed_prefix, object_key) return send_oms_result def send_oms(object_key): """ After APIG authentication succeeds, the changed object information in the COS bucket is sent to OMS. For details about API Gateway authentication, see https://support.huaweicloud.com/intl/en-us/devg-apisign/api-sign-sdk-python.html. :param object_key: The name of a new or modified object. :return: The response result. """ sig = signer.Signer() sig.Key = HW_AK sig.Secret = HW_SK url = SYNC_URL body = json.dumps({"object_keys": [object_key]}) r = signer.HttpRequest("POST", url) r.headers = {"content-type": "application/json"} r.body = body sig.Sign(r) resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body) return resp def record_list(logger, log_bucket_client, prefix, object_key): """ Record the objects that are successfully migrated to or fail to be migrated to OMS. :param log_bucket_client: The log bucket client. :param prefix: Prefixes of the requested and unrequested objects. :param object_key: The names of requested and unrequested objects. :return: None """ record_key = prefix + object_key try: resp = log_bucket_client.put_object( Bucket=LOG_BUCKET, Body=b'', Key=record_key ) if resp.status == 200: logger.info("record success or failed object success.") else: logger.error("record success or failed object failed, object_key: {0}".format(object_key)) except: logger.error("record success or failed object failed, object_key: {0}".format(object_key))
- signer.py
import copy import sys import hashlib import hmac import binascii from datetime import datetime if sys.version_info.major < 3: from urllib import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) else: from urllib.parse import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8')) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) def urlencode(s): return quote(s, safe='~') def findHeader(r, header): for k in r.headers: if k.lower() == header.lower(): return r.headers[k] return None # HexEncodeSHA256Hash returns hexcode of sha256 def HexEncodeSHA256Hash(data): sha256 = hashlib.sha256() sha256.update(data) return sha256.hexdigest() # HWS API Gateway Signature class HttpRequest: def __init__(self, method="", url="", headers=None, body=""): self.method = method spl = url.split("://", 1) scheme = 'http' if len(spl) > 1: scheme = spl[0] url = spl[1] query = {} spl = url.split('?', 1) url = spl[0] if len(spl) > 1: for kv in spl[1].split("&"): spl = kv.split("=", 1) key = spl[0] value = "" if len(spl) > 1: value = spl[1] if key != '': key = unquote(key) value = unquote(value) if key in query: query[key].append(value) else: query[key] = [value] spl = url.split('/', 1) host = spl[0] if len(spl) > 1: url = '/' + spl[1] else: url = '/' self.scheme = scheme self.host = host self.uri = url self.query = query if headers is None: self.headers = {} else: self.headers = copy.deepcopy(headers) if sys.version_info.major < 3: self.body = body else: self.body = body.encode("utf-8") BasicDateFormat = "%Y%m%dT%H%M%SZ" Algorithm = "SDK-HMAC-SHA256" HeaderXDate = "X-Sdk-Date" HeaderHost = "host" HeaderAuthorization = "Authorization" HeaderContentSha256 = "x-sdk-content-sha256" # Build a CanonicalRequest from a regular request string # # CanonicalRequest = # HTTPRequestMethod + '\n' + # CanonicalURI + '\n' + # CanonicalQueryString + '\n' + # CanonicalHeaders + '\n' + # SignedHeaders + '\n' + # HexEncode(Hash(RequestPayload)) def CanonicalRequest(r, signedHeaders): canonicalHeaders = CanonicalHeaders(r, signedHeaders) hexencode = findHeader(r, HeaderContentSha256) if hexencode is None: hexencode = HexEncodeSHA256Hash(r.body) return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r), canonicalHeaders, ";".join(signedHeaders), hexencode) def CanonicalURI(r): pattens = unquote(r.uri).split('/') uri = [] for v in pattens: uri.append(urlencode(v)) urlpath = "/".join(uri) if urlpath[-1] != '/': urlpath = urlpath + "/" # always end with / # r.uri = urlpath return urlpath def CanonicalQueryString(r): keys = [] for key in r.query: keys.append(key) keys.sort() a = [] for key in keys: k = urlencode(key) value = r.query[key] if type(value) is list: value.sort() for v in value: kv = k + "=" + urlencode(str(v)) a.append(kv) else: kv = k + "=" + urlencode(str(value)) a.append(kv) return '&'.join(a) def CanonicalHeaders(r, signedHeaders): a = [] __headers = {} for key in r.headers: keyEncoded = key.lower() value = r.headers[key] valueEncoded = value.strip() __headers[keyEncoded] = valueEncoded if sys.version_info.major == 3: r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1') for key in signedHeaders: a.append(key + ":" + __headers[key]) return '\n'.join(a) + "\n" def SignedHeaders(r): a = [] for key in r.headers: a.append(key.lower()) a.sort() return a # Create the HWS Signature. def SignStringToSign(stringToSign, signingKey): hm = hmacsha256(signingKey, stringToSign) return binascii.hexlify(hm).decode() # Get the finalized value for the "Authorization" header. The signature # parameter is the output from SignStringToSign def AuthHeaderValue(signature, AppKey, signedHeaders): return "%s Access=%s, SignedHeaders=%s, Signature=%s" % ( Algorithm, AppKey, ";".join(signedHeaders), signature) class Signer: def __init__(self): self.Key = "" self.Secret = "" def Verify(self, r, authorization): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: return False else: t = datetime.strptime(headerTime, BasicDateFormat) signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) return authorization == SignStringToSign(stringToSign, self.Secret) # SignRequest set Authorization header def Sign(self, r): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: t = datetime.utcnow() r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat) else: t = datetime.strptime(headerTime, BasicDateFormat) haveHost = False for key in r.headers: if key.lower() == 'host': haveHost = True break if not haveHost: r.headers["host"] = r.host signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) signature = SignStringToSign(stringToSign, self.Secret) authValue = AuthHeaderValue(signature, self.Key, signedHeaders) r.headers[HeaderAuthorization] = authValue r.headers["content-length"] = str(len(r.body)) queryString = CanonicalQueryString(r) if queryString != "": r.uri = r.uri + "?" + queryString
- index.py
- (Optional) In Advanced configuration, configure the environment, permission, layer, and network of the function as needed. For more information, see Function.
- Configure a trigger. Select Custom for Create trigger.
- Trigger method: Select COS triggering.
- COS Bucket: Select the source bucket in the synchronization task.
- Event Type: Select All Creation Events.
- Click Finish. The function is created. You can view the created function on the Function Service page.
- Click the name of the target function. On the Function Management page, click the Function Code tab, and modify the following configuration items in index.py:
- HW_AK = "***Access Key***"
- HW_SK = "***Secret Access Key***"
- SYNC_URL = "***Synchronization Request Receiving Address***"
- secret_id = "***secret_id***"
- secret_key = "***secret_key***"
- region = 'ap-beijing'
- Select a test event (COS PUT event template or COS POST event template) and click Test.
- If the returned result is true, the test is successful. If the test fails, check whether the function parameters are configured correctly and whether you have required permissions.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot