更新时间:2024-10-25 GMT+08:00
腾讯云COS配置同步请求
背景信息
创建同步任务后,您需要在源端配置同步请求,以确保源端云服务商的对象存储服务可以针对源端新增、修改对象实时调用OMS同步接口(例如,通过消息通知或函数计算服务方式),以完成对源端新增、修改对象数据的同步迁移。
配置条件
- 实时获取源端新增、修改对象名称。
- 调用OMS同步接口,将源端变化对象传递给对象存储迁移服务 OMS,从而完成源端同步迁移。
配置方式
基于源端云服务商的对象存储服务和函数工作流服务,通过配置源端云服务商的对象存储服务和函数工作流服务来触发新增、修改对象的同步请求。
操作步骤
- 登录腾讯云函数服务 Serverless 控制台,在左侧导航栏,单击函数服务。
- 在页面上方选择地域,单击新建,进入新建页面。
创建的函数需要与源端桶在同一区域。
- 在新建函数页面,选择“从头开始”。
- 函数类型:选择“事件函数”
- 函数名称:默认填充,可根据需要自行修改。
- 运行环境:选择Python3.6。
- 函数代码提交方式选择“本地上传zip包”,将以下两个代码文件index.py和signer.py打包为至一个zip包并上传。
- index.py
# -*- coding: utf8 -*- from qcloud_cos_v5 import CosConfig from qcloud_cos_v5 import CosS3Client import requests import os import json import logging import signer # 腾讯云侧信息 # 日志桶不可以是源端桶 LOG_BUCKET = "***cos log bucket***" # 由 BucketName-APPID 构成 # 华为云侧信息 # 请使低权限ak/sk,该ak/sk仅需要iam/oms权限 HW_AK = "***Access Key***" HW_SK = "***Secret Access Key***" # 同步请求接收地址,见同步任务详情 SYNC_URL = "***Synchronization Request Receiving Address***" def main_handler(event, context): logger = logging.getLogger() # 初始化cos授权 # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://www.tencentcloud.com/zh/document/product/598/32675 secret_id = "***secret_id***" # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://www.tencentcloud.com/zh/document/product/598/32675 secret_key = "***secret_key***" # 替换为用户的 region,已创建桶归属的region可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket region = 'ap-beijing' # COS支持的所有region列表参见https://www.tencentcloud.com/zh/document/product/436/6224 token = None # 如果使用永久密钥不需要填入token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见https://www.tencentcloud.com/zh/document/product/436/14048 scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填 config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme) cos_log_client = CosS3Client(config) # 解析cos事件同步到oms服务 send_oms_result = False success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2]) failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2]) evt = event["Records"][0] object_key = evt["cos"]["cosObject"]["key"] try: logger.info("start sync to oms!") resp = send_oms(object_key) if resp.status_code == 200: logger.info("call oms api success, object_key: {0}".format(object_key)) record_list(logger, cos_log_client, success_prefix, object_key) send_oms_result = True else: logger.error( "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content))) record_list(logger, cos_log_client, failed_prefix, object_key) except Exception as e: logger.error("Internet error, call oms api failed!") record_list(logger, cos_log_client, failed_prefix, object_key) return send_oms_result def send_oms(object_key): """ apig鉴权后,将cos桶内变更对象信息发送给oms服务,apig鉴权具体方法参考:(https://support.huaweicloud.com/intl/zh-cn/devg-apisign/api-sign-sdk-python.html) :param object_key: 源端新增、修改对象名称 :return: response对象 """ sig = signer.Signer() sig.Key = HW_AK sig.Secret = HW_SK url = SYNC_URL body = json.dumps({"object_keys": [object_key]}) r = signer.HttpRequest("POST", url) r.headers = {"content-type": "application/json"} r.body = body sig.Sign(r) resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body) return resp def record_list(logger, log_bucket_client, prefix, object_key): """ 记录发送到oms成功和失败的对象. :param log_bucket_client: 日志桶客户端 :param prefix: 日志前缀 :param object_key: 成功或失败对象名称 :return: None """ record_key = prefix + object_key try: resp = log_bucket_client.put_object( Bucket=LOG_BUCKET, Body=b'', Key=record_key ) if resp.status == 200: logger.info("record success or failed object success.") else: logger.error("record success or failed object failed, object_key: {0}".format(object_key)) except: logger.error("record success or failed object failed, object_key: {0}".format(object_key))
- signer.py
import copy import sys import hashlib import hmac import binascii from datetime import datetime if sys.version_info.major < 3: from urllib import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) else: from urllib.parse import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8')) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) def urlencode(s): return quote(s, safe='~') def findHeader(r, header): for k in r.headers: if k.lower() == header.lower(): return r.headers[k] return None # HexEncodeSHA256Hash returns hexcode of sha256 def HexEncodeSHA256Hash(data): sha256 = hashlib.sha256() sha256.update(data) return sha256.hexdigest() # HWS API Gateway Signature class HttpRequest: def __init__(self, method="", url="", headers=None, body=""): self.method = method spl = url.split("://", 1) scheme = 'http' if len(spl) > 1: scheme = spl[0] url = spl[1] query = {} spl = url.split('?', 1) url = spl[0] if len(spl) > 1: for kv in spl[1].split("&"): spl = kv.split("=", 1) key = spl[0] value = "" if len(spl) > 1: value = spl[1] if key != '': key = unquote(key) value = unquote(value) if key in query: query[key].append(value) else: query[key] = [value] spl = url.split('/', 1) host = spl[0] if len(spl) > 1: url = '/' + spl[1] else: url = '/' self.scheme = scheme self.host = host self.uri = url self.query = query if headers is None: self.headers = {} else: self.headers = copy.deepcopy(headers) if sys.version_info.major < 3: self.body = body else: self.body = body.encode("utf-8") BasicDateFormat = "%Y%m%dT%H%M%SZ" Algorithm = "SDK-HMAC-SHA256" HeaderXDate = "X-Sdk-Date" HeaderHost = "host" HeaderAuthorization = "Authorization" HeaderContentSha256 = "x-sdk-content-sha256" # Build a CanonicalRequest from a regular request string # # CanonicalRequest = # HTTPRequestMethod + '\n' + # CanonicalURI + '\n' + # CanonicalQueryString + '\n' + # CanonicalHeaders + '\n' + # SignedHeaders + '\n' + # HexEncode(Hash(RequestPayload)) def CanonicalRequest(r, signedHeaders): canonicalHeaders = CanonicalHeaders(r, signedHeaders) hexencode = findHeader(r, HeaderContentSha256) if hexencode is None: hexencode = HexEncodeSHA256Hash(r.body) return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r), canonicalHeaders, ";".join(signedHeaders), hexencode) def CanonicalURI(r): pattens = unquote(r.uri).split('/') uri = [] for v in pattens: uri.append(urlencode(v)) urlpath = "/".join(uri) if urlpath[-1] != '/': urlpath = urlpath + "/" # always end with / # r.uri = urlpath return urlpath def CanonicalQueryString(r): keys = [] for key in r.query: keys.append(key) keys.sort() a = [] for key in keys: k = urlencode(key) value = r.query[key] if type(value) is list: value.sort() for v in value: kv = k + "=" + urlencode(str(v)) a.append(kv) else: kv = k + "=" + urlencode(str(value)) a.append(kv) return '&'.join(a) def CanonicalHeaders(r, signedHeaders): a = [] __headers = {} for key in r.headers: keyEncoded = key.lower() value = r.headers[key] valueEncoded = value.strip() __headers[keyEncoded] = valueEncoded if sys.version_info.major == 3: r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1') for key in signedHeaders: a.append(key + ":" + __headers[key]) return '\n'.join(a) + "\n" def SignedHeaders(r): a = [] for key in r.headers: a.append(key.lower()) a.sort() return a # Create the HWS Signature. def SignStringToSign(stringToSign, signingKey): hm = hmacsha256(signingKey, stringToSign) return binascii.hexlify(hm).decode() # Get the finalized value for the "Authorization" header. The signature # parameter is the output from SignStringToSign def AuthHeaderValue(signature, AppKey, signedHeaders): return "%s Access=%s, SignedHeaders=%s, Signature=%s" % ( Algorithm, AppKey, ";".join(signedHeaders), signature) class Signer: def __init__(self): self.Key = "" self.Secret = "" def Verify(self, r, authorization): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: return False else: t = datetime.strptime(headerTime, BasicDateFormat) signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) return authorization == SignStringToSign(stringToSign, self.Secret) # SignRequest set Authorization header def Sign(self, r): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: t = datetime.utcnow() r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat) else: t = datetime.strptime(headerTime, BasicDateFormat) haveHost = False for key in r.headers: if key.lower() == 'host': haveHost = True break if not haveHost: r.headers["host"] = r.host signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) signature = SignStringToSign(stringToSign, self.Secret) authValue = AuthHeaderValue(signature, self.Key, signedHeaders) r.headers[HeaderAuthorization] = authValue r.headers["content-length"] = str(len(r.body)) queryString = CanonicalQueryString(r) if queryString != "": r.uri = r.uri + "?" + queryString
- index.py
- (可选)展开“高级配置”,您可以根据实际需求修改环境配置、权限配置、层配置、网络配置等,详情参见函数相关配置。
- 配置触发器,在“创建触发器”中选择自定义创建。
- 触发方式:选择“COS触发”。
- COS Bucket:选择同步任务的源端桶。
- 事件类型:选择“全部创建”。
- 单击“完成”,创建函数完成,您可以在函数服务页面查看已创建的函数。
- 单击函数名称,进入“函数管理”页面,选择函数代码页签,修改index.py中的以下配置项。
- HW_AK = "***Access Key***"
- HW_SK = "***Secret Access Key***"
- SYNC_URL = "***Synchronization Request Receiving Address***"
- secret_id = "***secret_id***"
- secret_key = "***secret_key***"
- region = 'ap-beijing'
- 选择测试事件(COS 对象存储的 PUT 事件模板、COS 对象存储的 POST 事件模板),单击“测试”。
- 在返回结果为true表示测试成功。若测试失败,请检查参数配置和云函数权限配置。
父主题: 各云服务商配置同步请求教程