Configuring Alibaba Cloud OSS to Send Synchronization Requests
Background
After you create a synchronization task, you need to configure the source object storage service to request OMS to migrate newly added and modified source objects. To do so, you can use a message notification or function service on the source platform.
Desired Effects
- The names of newly added and modified objects in the source bucket can be obtained in real time.
- The OMS synchronization API can be invoked to migrate the newly added and modified objects to the destination bucket.
Configuration Methods
The following describes how to use the function service on the source platform to send synchronization requests.
Procedure
- Log in to the Alibaba Cloud console. In the left navigation pane, choose Functions.
- In the top menu bar, select a region. On the Functions page, click Create Function.
The created function must be in the same region as the source bucket.
- On the Create Function page, select Event Function, set the following parameters, and click Create.
Table 1 Parameters required for creating an event function Parameter
Description
Function Name
Enter a name.
Runtime
Select Python 3.
Code Upload Method
Select Upload ZIP. Compress the index.py and signer.py files into a .zip package and upload it.
- index.py
# -*- coding: utf-8 -*- import logging import json import requests import oss2 import signer # Alibaba Cloud information # Do not use the source bucket as the log bucket. LOG_BUCKET = "***oss log bucket***" OSS_ENDPOINT = "***oss endpoint***" # Huawei Cloud information #Use the AK/SK pair of an account that has only the required IAM and OMS permissions. HW_AK = "***Access Key***" HW_SK = "***Secret Access Key***" # The address for receiving synchronization requests. Get it from the synchronization task details. SYNC_URL = "***Synchronization Request Receiving Address***" def handler(event, context): logger = logging.getLogger() # Initialize an OSS client instance. creds = context.credentials auth = oss2.StsAuth(creds.access_key_id, creds.access_key_secret, creds.security_token) oss_client = oss2.Bucket(auth, OSS_ENDPOINT, LOG_BUCKET) # Parse OSS events and synchronize them to OMS. send_oms_result = False success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2]) failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2]) evt = json.loads(event)["events"][0] object_key = evt["oss"]["object"]["key"] try: logger.info("start sync to oms!") resp = send_oms(object_key) if resp.status_code == 200: logger.info("call oms api success, object_key: {0}".format(object_key)) record_list(logger, oss_client, success_prefix, object_key) send_oms_result = True else: logger.error( "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content))) record_list(logger, oss_client, failed_prefix, object_key) except: logger.error("Internet error, call oms api failed!") record_list(logger, oss_client, failed_prefix, object_key) return send_oms_result def send_oms(object_key): """ After APIG authentication succeeds, the changed object information in the OSS bucket is sent to OMS. For details about API Gateway authentication, see https://support.huaweicloud.com/intl/en-us/devg-apisign/api-sign-sdk-python.html. :param object_key: The name of a new or modified object. :return: The response result. """ sig = signer.Signer() sig.Key = HW_AK sig.Secret = HW_SK url = SYNC_URL body = json.dumps({"object_keys": [object_key]}) r = signer.HttpRequest("POST", url) r.headers = {"content-type": "application/json"} r.body = body sig.Sign(r) resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body) return resp def record_list(logger, log_bucket_client, prefix, object_key): """ Record the requested and unrequested objects. :param log_bucket_client: The log bucket client. :param prefix: Prefixes of the requested and unrequested objects. :param object_key: The names of requested and unrequested objects. :return: None """ record_key = prefix + object_key try: resp = log_bucket_client.put_object(record_key, "") if resp.status == 200: logger.info("record success or failed object success.") else: logger.error("record success or failed object failed, object_key: {0}".format(object_key)) except: logger.error("record success or failed object failed, object_key: {0}".format(object_key))
- signer.py
import copy import sys import hashlib import hmac import binascii from datetime import datetime if sys.version_info.major < 3: from urllib import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) else: from urllib.parse import quote, unquote def hmacsha256(keyByte, message): return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest() # Create a "String to Sign". def StringToSign(canonicalRequest, t): bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8')) return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes) def urlencode(s): return quote(s, safe='~') def findHeader(r, header): for k in r.headers: if k.lower() == header.lower(): return r.headers[k] return None # HexEncodeSHA256Hash returns hexcode of sha256 def HexEncodeSHA256Hash(data): sha256 = hashlib.sha256() sha256.update(data) return sha256.hexdigest() # HWS API Gateway Signature class HttpRequest: def __init__(self, method="", url="", headers=None, body=""): self.method = method spl = url.split("://", 1) scheme = 'http' if len(spl) > 1: scheme = spl[0] url = spl[1] query = {} spl = url.split('?', 1) url = spl[0] if len(spl) > 1: for kv in spl[1].split("&"): spl = kv.split("=", 1) key = spl[0] value = "" if len(spl) > 1: value = spl[1] if key != '': key = unquote(key) value = unquote(value) if key in query: query[key].append(value) else: query[key] = [value] spl = url.split('/', 1) host = spl[0] if len(spl) > 1: url = '/' + spl[1] else: url = '/' self.scheme = scheme self.host = host self.uri = url self.query = query if headers is None: self.headers = {} else: self.headers = copy.deepcopy(headers) if sys.version_info.major < 3: self.body = body else: self.body = body.encode("utf-8") BasicDateFormat = "%Y%m%dT%H%M%SZ" Algorithm = "SDK-HMAC-SHA256" HeaderXDate = "X-Sdk-Date" HeaderHost = "host" HeaderAuthorization = "Authorization" HeaderContentSha256 = "x-sdk-content-sha256" # Build a CanonicalRequest from a regular request string # # CanonicalRequest = # HTTPRequestMethod + '\n' + # CanonicalURI + '\n' + # CanonicalQueryString + '\n' + # CanonicalHeaders + '\n' + # SignedHeaders + '\n' + # HexEncode(Hash(RequestPayload)) def CanonicalRequest(r, signedHeaders): canonicalHeaders = CanonicalHeaders(r, signedHeaders) hexencode = findHeader(r, HeaderContentSha256) if hexencode is None: hexencode = HexEncodeSHA256Hash(r.body) return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r), canonicalHeaders, ";".join(signedHeaders), hexencode) def CanonicalURI(r): pattens = unquote(r.uri).split('/') uri = [] for v in pattens: uri.append(urlencode(v)) urlpath = "/".join(uri) if urlpath[-1] != '/': urlpath = urlpath + "/" # always end with / # r.uri = urlpath return urlpath def CanonicalQueryString(r): keys = [] for key in r.query: keys.append(key) keys.sort() a = [] for key in keys: k = urlencode(key) value = r.query[key] if type(value) is list: value.sort() for v in value: kv = k + "=" + urlencode(str(v)) a.append(kv) else: kv = k + "=" + urlencode(str(value)) a.append(kv) return '&'.join(a) def CanonicalHeaders(r, signedHeaders): a = [] __headers = {} for key in r.headers: keyEncoded = key.lower() value = r.headers[key] valueEncoded = value.strip() __headers[keyEncoded] = valueEncoded if sys.version_info.major == 3: r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1') for key in signedHeaders: a.append(key + ":" + __headers[key]) return '\n'.join(a) + "\n" def SignedHeaders(r): a = [] for key in r.headers: a.append(key.lower()) a.sort() return a # Create the HWS Signature. def SignStringToSign(stringToSign, signingKey): hm = hmacsha256(signingKey, stringToSign) return binascii.hexlify(hm).decode() # Get the finalized value for the "Authorization" header. The signature # parameter is the output from SignStringToSign def AuthHeaderValue(signature, AppKey, signedHeaders): return "%s Access=%s, SignedHeaders=%s, Signature=%s" % ( Algorithm, AppKey, ";".join(signedHeaders), signature) class Signer: def __init__(self): self.Key = "" self.Secret = "" def Verify(self, r, authorization): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: return False else: t = datetime.strptime(headerTime, BasicDateFormat) signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) return authorization == SignStringToSign(stringToSign, self.Secret) # SignRequest set Authorization header def Sign(self, r): if sys.version_info.major == 3 and isinstance(r.body, str): r.body = r.body.encode('utf-8') headerTime = findHeader(r, HeaderXDate) if headerTime is None: t = datetime.utcnow() r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat) else: t = datetime.strptime(headerTime, BasicDateFormat) haveHost = False for key in r.headers: if key.lower() == 'host': haveHost = True break if not haveHost: r.headers["host"] = r.host signedHeaders = SignedHeaders(r) canonicalRequest = CanonicalRequest(r, signedHeaders) stringToSign = StringToSign(canonicalRequest, t) signature = SignStringToSign(stringToSign, self.Secret) authValue = AuthHeaderValue(signature, self.Key, signedHeaders) r.headers[HeaderAuthorization] = authValue r.headers["content-length"] = str(len(r.body)) queryString = CanonicalQueryString(r) if queryString != "": r.uri = r.uri + "?" + queryString
- index.py
- Go to the function details page and on the Code tab page, modify the following configuration items in index.py.
- LOG_BUCKET = "***oss log bucket***"
- OSS_ENDPOINT = "***oss endpoint***"
- HW_AK = "***Access Key***"
- HW_SK = "***Secret Access Key***"
- SYNC_URL = "***Synchronization Request Receiving Address***"
- On the Configuration tab page, choose Trigger on the left and click Create Trigger. Create an OSS event trigger by referring to Table 2 and click OK.
Table 2 Parameters required for creating a trigger Parameter
Description
Trigger Type
Choose OSS.
Name
Enter a name.
Version or Alias
Retain the default value.
Bucket Name
Select the source bucket. The bucket must be the one in the synchronization task.
Trigger Event
Retain the default value.
Role Name
Retain the default value.
- Click the Test tab, set Event Template to OSS, customize an event name, and click Test Function to perform the test. If the execution result is true, the test is successful. If the test fails, check whether the function parameters are configured correctly and whether you have required permissions.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot