更新时间:2024-10-25 GMT+08:00
阿里云OSS配置同步请求
背景信息
创建同步任务后,您需要在源端配置同步请求,以确保源端云服务商的对象存储服务可以针对源端新增、修改对象实时调用OMS同步接口(例如,通过消息通知或函数计算服务方式),以完成对源端新增、修改对象数据的同步迁移。
配置条件
- 实时获取源端新增、修改对象名称。
- 调用OMS同步接口,将源端变化对象传递给对象存储迁移服务 OMS,从而完成源端同步迁移。
配置方式
基于源端云服务商的对象存储服务和函数工作流服务,通过配置源端云服务商的对象存储服务和函数工作流服务来触发新增、修改对象的同步请求。
操作步骤
- 登录阿里云函数计算控制台,在左侧导航栏,单击函数。
- 在顶部菜单栏,选择地域,然后在函数页面,单击创建函数。
创建的函数需要与源端桶在同一区域。
- 在创建函数页面,选择事件函数,配置以下参数,然后单击创建。
表1 参数配置说明 参数
配置说明
函数名称
用户自定义。
运行环境
选择Python3。
代码上传方式
- index.py
# -*- coding: utf-8 -*- import logging import json import requests import oss2 import signer # 阿里云侧信息 # 日志桶不可以是源端桶 LOG_BUCKET = "***oss log bucket***" OSS_ENDPOINT = "***oss endpoint***" # 华为云侧信息 # 请使低权限ak/sk,该ak/sk仅需要iam/oms权限 HW_AK = "***Access Key***" HW_SK = "***Secret Access Key***" # 同步请求接收地址,见同步任务详情 SYNC_URL = "***Synchronization Request Receiving Address***" def handler(event, context): logger = logging.getLogger() # 初始化oss授权 creds = context.credentials auth = oss2.StsAuth(creds.access_key_id, creds.access_key_secret, creds.security_token) oss_client = oss2.Bucket(auth, OSS_ENDPOINT, LOG_BUCKET) # 解析oss事件同步到oms服务 send_oms_result = False success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2]) failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2]) evt = json.loads(event)["events"][0] object_key = evt["oss"]["object"]["key"] try: logger.info("start sync to oms!") resp = send_oms(object_key) if resp.status_code == 200: logger.info("call oms api success, object_key: {0}".format(object_key)) record_list(logger, oss_client, success_prefix, object_key) send_oms_result = True else: logger.error( "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content))) record_list(logger, oss_client, failed_prefix, object_key) except: logger.error("Internet error, call oms api failed!") record_list(logger, oss_client, failed_prefix, object_key) return send_oms_result def send_oms(object_key): """ apig鉴权后,将oss桶内变更对象信息发送给oms服务,apig鉴权具体方法参考:(https://support.huaweicloud.com/intl/zh-cn/devg-apisign/api-sign-sdk-python.html) :param object_key: 源端新增、修改对象名称 :return: response对象 """ sig = signer.Signer() sig.Key = HW_AK sig.Secret = HW_SK url = SYNC_URL body = json.dumps({"object_keys": [object_key]}) r = signer.HttpRequest("POST", url) r.headers = {"content-type": "application/json"} r.body = body sig.Sign(r) resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body) return resp def record_list(logger, log_bucket_client, prefix, object_key): """ 记录发送到oms成功和失败的对象. :param log_bucket_client: 日志桶客户端 :param prefix: 日志前缀 :param object_key: 成功或失败对象名称 :return: None """ record_key = prefix + object_key try: resp = log_bucket_client.put_object(record_key, "") if resp.status == 200: logger.info("record success or failed object success.") else: logger.error("record success or failed object failed, object_key: {0}".format(object_key)) except: logger.error("record success or failed object failed, object_key: {0}".format(object_key))
- signer.py
import copy import sys import hashlib import hmac import binascii from datetime import datetime if sys.version_info.major < 3: from urllib import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) else: from urllib.parse import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8')) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) def urlencode(s): return quote(s, safe='~') def findHeader(r, header): for k in r.headers: if k.lower() == header.lower(): return r.headers[k] return None # HexEncodeSHA256Hash returns hexcode of sha256 def HexEncodeSHA256Hash(data): sha256 = hashlib.sha256() sha256.update(data) return sha256.hexdigest() # HWS API Gateway Signature class HttpRequest: def __init__(self, method="", url="", headers=None, body=""): self.method = method spl = url.split("://", 1) scheme = 'http' if len(spl) > 1: scheme = spl[0] url = spl[1] query = {} spl = url.split('?', 1) url = spl[0] if len(spl) > 1: for kv in spl[1].split("&"): spl = kv.split("=", 1) key = spl[0] value = "" if len(spl) > 1: value = spl[1] if key != '': key = unquote(key) value = unquote(value) if key in query: query[key].append(value) else: query[key] = [value] spl = url.split('/', 1) host = spl[0] if len(spl) > 1: url = '/' + spl[1] else: url = '/' self.scheme = scheme self.host = host self.uri = url self.query = query if headers is None: self.headers = {} else: self.headers = copy.deepcopy(headers) if sys.version_info.major < 3: self.body = body else: self.body = body.encode("utf-8") BasicDateFormat = "%Y%m%dT%H%M%SZ" Algorithm = "SDK-HMAC-SHA256" HeaderXDate = "X-Sdk-Date" HeaderHost = "host" HeaderAuthorization = "Authorization" HeaderContentSha256 = "x-sdk-content-sha256" # Build a CanonicalRequest from a regular request string # # CanonicalRequest = # HTTPRequestMethod + '\n' + # CanonicalURI + '\n' + # CanonicalQueryString + '\n' + # CanonicalHeaders + '\n' + # SignedHeaders + '\n' + # HexEncode(Hash(RequestPayload)) def CanonicalRequest(r, signedHeaders): canonicalHeaders = CanonicalHeaders(r, signedHeaders) hexencode = findHeader(r, HeaderContentSha256) if hexencode is None: hexencode = HexEncodeSHA256Hash(r.body) return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r), canonicalHeaders, ";".join(signedHeaders), hexencode) def CanonicalURI(r): pattens = unquote(r.uri).split('/') uri = [] for v in pattens: uri.append(urlencode(v)) urlpath = "/".join(uri) if urlpath[-1] != '/': urlpath = urlpath + "/" # always end with / # r.uri = urlpath return urlpath def CanonicalQueryString(r): keys = [] for key in r.query: keys.append(key) keys.sort() a = [] for key in keys: k = urlencode(key) value = r.query[key] if type(value) is list: value.sort() for v in value: kv = k + "=" + urlencode(str(v)) a.append(kv) else: kv = k + "=" + urlencode(str(value)) a.append(kv) return '&'.join(a) def CanonicalHeaders(r, signedHeaders): a = [] __headers = {} for key in r.headers: keyEncoded = key.lower() value = r.headers[key] valueEncoded = value.strip() __headers[keyEncoded] = valueEncoded if sys.version_info.major == 3: r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1') for key in signedHeaders: a.append(key + ":" + __headers[key]) return '\n'.join(a) + "\n" def SignedHeaders(r): a = [] for key in r.headers: a.append(key.lower()) a.sort() return a # Create the HWS Signature. def SignStringToSign(stringToSign, signingKey): hm = hmacsha256(signingKey, stringToSign) return binascii.hexlify(hm).decode() # Get the finalized value for the "Authorization" header. The signature # parameter is the output from SignStringToSign def AuthHeaderValue(signature, AppKey, signedHeaders): return "%s Access=%s, SignedHeaders=%s, Signature=%s" % ( Algorithm, AppKey, ";".join(signedHeaders), signature) class Signer: def __init__(self): self.Key = "" self.Secret = "" def Verify(self, r, authorization): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: return False else: t = datetime.strptime(headerTime, BasicDateFormat) signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) return authorization == SignStringToSign(stringToSign, self.Secret) # SignRequest set Authorization header def Sign(self, r): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: t = datetime.utcnow() r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat) else: t = datetime.strptime(headerTime, BasicDateFormat) haveHost = False for key in r.headers: if key.lower() == 'host': haveHost = True break if not haveHost: r.headers["host"] = r.host signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) signature = SignStringToSign(stringToSign, self.Secret) authValue = AuthHeaderValue(signature, self.Key, signedHeaders) r.headers[HeaderAuthorization] = authValue r.headers["content-length"] = str(len(r.body)) queryString = CanonicalQueryString(r) if queryString != "": r.uri = r.uri + "?" + queryString
- index.py
- 函数创建成功后进入函数详情页,在代码页签修改index.py中的如下配置项。
- LOG_BUCKET = "***oss log bucket***"
- OSS_ENDPOINT = "***oss endpoint***"
- HW_AK = "***Access Key***"
- HW_SK = "***Secret Access Key***"
- SYNC_URL = "***Synchronization Request Receiving Address***"
- 在配置页签,左侧选择触发器,单击“创建触发器”,参考表2,创建OSS事件触发器并单击“确定”保存。
- OSS触发器创建完成后,选择测试页签,事件模板选择“对象存储 OSS”,自定义事件名称,单击“测试函数”执行测试,执行结果为true表示成功。若调用失败,请检查参数配置和函数计算权限配置。
父主题: 各云服务商配置同步请求教程