Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Configuring Alibaba Cloud OSS to Send Synchronization Requests

Updated on 2024-05-16 GMT+08:00

Background

After you create a synchronization task, you need to configure the source object storage service to request OMS to migrate newly added and modified source objects. To do so, you can use a message notification or function service on the source platform.

Desired Effects

  • The names of newly added and modified objects in the source bucket can be obtained in real time.
  • The OMS synchronization API can be invoked to migrate the newly added and modified objects to the destination bucket.

Configuration Methods

The following describes how to use the function service on the source platform to send synchronization requests.

Procedure

  1. Log in to the Alibaba Cloud console. In the left navigation pane, choose Functions.
  2. In the top menu bar, select a region. On the Functions page, click Create Function.

    CAUTION:

    The created function must be in the same region as the source bucket.

  3. On the Create Function page, select Event Function, set the following parameters, and click Create.

    Table 1 Parameters required for creating an event function

    Parameter

    Description

    Function Name

    Enter a name.

    Runtime

    Select Python 3.

    Code Upload Method

    Select Upload ZIP. Compress the index.py and signer.py files into a .zip package and upload it.

    • index.py
      # -*- coding: utf-8 -*-
      import logging
      import json
      
      import requests
      import oss2
      import signer
      
      # Alibaba Cloud information
      # Do not use the source bucket as the log bucket.
      LOG_BUCKET = "***oss log bucket***"
      OSS_ENDPOINT = "***oss endpoint***"
      
      # Huawei Cloud information
      #Use the AK/SK pair of an account that has only the required IAM and OMS permissions.
      HW_AK = "***Access Key***"
      HW_SK = "***Secret Access Key***"
      # The address for receiving synchronization requests. Get it from the synchronization task details.
      SYNC_URL = "***Synchronization Request Receiving Address***"
      
      
      def handler(event, context):
          logger = logging.getLogger()
      
          # Initialize an OSS client instance.
          creds = context.credentials
          auth = oss2.StsAuth(creds.access_key_id, creds.access_key_secret, creds.security_token)
          oss_client = oss2.Bucket(auth, OSS_ENDPOINT, LOG_BUCKET)
      
          # Parse OSS events and synchronize them to OMS.
          send_oms_result = False
          success_prefix = "oms_source_record/{0}/success_object/".format(SYNC_URL.split("/")[-2])
          failed_prefix = "oms_source_record/{0}/failed_object/".format(SYNC_URL.split("/")[-2])
          evt = json.loads(event)["events"][0]
          object_key = evt["oss"]["object"]["key"]
          try:
              logger.info("start sync to oms!")
              resp = send_oms(object_key)
              if resp.status_code == 200:
                  logger.info("call oms api success, object_key: {0}".format(object_key))
                  record_list(logger, oss_client, success_prefix, object_key)
                  send_oms_result = True
              else:
                  logger.error(
                      "call oms api fail, status_code: {0}, response_msg: {1}".format(resp.status_code, str(resp.content)))
                  record_list(logger, oss_client, failed_prefix, object_key)
          except:
              logger.error("Internet error, call oms api failed!")
              record_list(logger, oss_client, failed_prefix, object_key)
          return send_oms_result
      
      
      def send_oms(object_key):
          """
          After APIG authentication succeeds, the changed object information in the OSS bucket is sent to OMS. For details about API Gateway authentication, see https://support.huaweicloud.com/intl/en-us/devg-apisign/api-sign-sdk-python.html.
      :param object_key: The name of a new or modified object.
          :return: The response result.
          """
          sig = signer.Signer()
          sig.Key = HW_AK
          sig.Secret = HW_SK
      
          url = SYNC_URL
          body = json.dumps({"object_keys": [object_key]})
          r = signer.HttpRequest("POST", url)
          r.headers = {"content-type": "application/json"}
          r.body = body
          sig.Sign(r)
          resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
          return resp
      
      
      def record_list(logger, log_bucket_client, prefix, object_key):
          """
          Record the requested and unrequested objects.
          :param log_bucket_client: The log bucket client.
          :param prefix: Prefixes of the requested and unrequested objects.
          :param object_key: The names of requested and unrequested objects.
          :return: None
          """
          record_key = prefix + object_key
          try:
              resp = log_bucket_client.put_object(record_key, "")
              if resp.status == 200:
                  logger.info("record success or failed object success.")
              else:
                  logger.error("record success or failed object failed, object_key: {0}".format(object_key))
          except:
              logger.error("record success or failed object failed, object_key: {0}".format(object_key))
      
    • signer.py
      import copy
      import sys
      import hashlib
      import hmac
      import binascii
      from datetime import datetime
      
      if sys.version_info.major < 3:
          from urllib import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte, message, digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest)
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      else:
          from urllib.parse import quote, unquote
      
      
          def hmacsha256(keyByte, message):
              return hmac.new(keyByte.encode('utf-8'), message.encode('utf-8'), digestmod=hashlib.sha256).digest()
      
      
          # Create a "String to Sign".
          def StringToSign(canonicalRequest, t):
              bytes = HexEncodeSHA256Hash(canonicalRequest.encode('utf-8'))
              return "%s\n%s\n%s" % (Algorithm, datetime.strftime(t, BasicDateFormat), bytes)
      
      
      def urlencode(s):
          return quote(s, safe='~')
      
      
      def findHeader(r, header):
          for k in r.headers:
              if k.lower() == header.lower():
                  return r.headers[k]
          return None
      
      
      # HexEncodeSHA256Hash returns hexcode of sha256
      def HexEncodeSHA256Hash(data):
          sha256 = hashlib.sha256()
          sha256.update(data)
          return sha256.hexdigest()
      
      
      # HWS API Gateway Signature
      class HttpRequest:
          def __init__(self, method="", url="", headers=None, body=""):
              self.method = method
              spl = url.split("://", 1)
              scheme = 'http'
              if len(spl) > 1:
                  scheme = spl[0]
                  url = spl[1]
              query = {}
              spl = url.split('?', 1)
              url = spl[0]
              if len(spl) > 1:
                  for kv in spl[1].split("&"):
                      spl = kv.split("=", 1)
                      key = spl[0]
                      value = ""
                      if len(spl) > 1:
                          value = spl[1]
                      if key != '':
                          key = unquote(key)
                          value = unquote(value)
                          if key in query:
                              query[key].append(value)
                          else:
                              query[key] = [value]
              spl = url.split('/', 1)
              host = spl[0]
              if len(spl) > 1:
                  url = '/' + spl[1]
              else:
                  url = '/'
      
              self.scheme = scheme
              self.host = host
              self.uri = url
              self.query = query
              if headers is None:
                  self.headers = {}
              else:
                  self.headers = copy.deepcopy(headers)
              if sys.version_info.major < 3:
                  self.body = body
              else:
                  self.body = body.encode("utf-8")
      
      
      BasicDateFormat = "%Y%m%dT%H%M%SZ"
      Algorithm = "SDK-HMAC-SHA256"
      HeaderXDate = "X-Sdk-Date"
      HeaderHost = "host"
      HeaderAuthorization = "Authorization"
      HeaderContentSha256 = "x-sdk-content-sha256"
      
      
      # Build a CanonicalRequest from a regular request string
      #
      # CanonicalRequest =
      #  HTTPRequestMethod + '\n' +
      #  CanonicalURI + '\n' +
      #  CanonicalQueryString + '\n' +
      #  CanonicalHeaders + '\n' +
      #  SignedHeaders + '\n' +
      #  HexEncode(Hash(RequestPayload))
      def CanonicalRequest(r, signedHeaders):
          canonicalHeaders = CanonicalHeaders(r, signedHeaders)
          hexencode = findHeader(r, HeaderContentSha256)
          if hexencode is None:
              hexencode = HexEncodeSHA256Hash(r.body)
          return "%s\n%s\n%s\n%s\n%s\n%s" % (r.method.upper(), CanonicalURI(r), CanonicalQueryString(r),
                                             canonicalHeaders, ";".join(signedHeaders), hexencode)
      
      
      def CanonicalURI(r):
          pattens = unquote(r.uri).split('/')
          uri = []
          for v in pattens:
              uri.append(urlencode(v))
          urlpath = "/".join(uri)
          if urlpath[-1] != '/':
              urlpath = urlpath + "/"  # always end with /
          # r.uri = urlpath
          return urlpath
      
      
      def CanonicalQueryString(r):
          keys = []
          for key in r.query:
              keys.append(key)
          keys.sort()
          a = []
          for key in keys:
              k = urlencode(key)
              value = r.query[key]
              if type(value) is list:
                  value.sort()
                  for v in value:
                      kv = k + "=" + urlencode(str(v))
                      a.append(kv)
              else:
                  kv = k + "=" + urlencode(str(value))
                  a.append(kv)
          return '&'.join(a)
      
      
      def CanonicalHeaders(r, signedHeaders):
          a = []
          __headers = {}
          for key in r.headers:
              keyEncoded = key.lower()
              value = r.headers[key]
              valueEncoded = value.strip()
              __headers[keyEncoded] = valueEncoded
              if sys.version_info.major == 3:
                  r.headers[key] = valueEncoded.encode("utf-8").decode('iso-8859-1')
          for key in signedHeaders:
              a.append(key + ":" + __headers[key])
          return '\n'.join(a) + "\n"
      
      
      def SignedHeaders(r):
          a = []
          for key in r.headers:
              a.append(key.lower())
          a.sort()
          return a
      
      
      # Create the HWS Signature.
      def SignStringToSign(stringToSign, signingKey):
          hm = hmacsha256(signingKey, stringToSign)
          return binascii.hexlify(hm).decode()
      
      
      # Get the finalized value for the "Authorization" header.  The signature
      # parameter is the output from SignStringToSign
      def AuthHeaderValue(signature, AppKey, signedHeaders):
          return "%s Access=%s, SignedHeaders=%s, Signature=%s" % (
              Algorithm, AppKey, ";".join(signedHeaders), signature)
      
      
      class Signer:
          def __init__(self):
              self.Key = ""
              self.Secret = ""
      
          def Verify(self, r, authorization):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  return False
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              return authorization == SignStringToSign(stringToSign, self.Secret)
      
          # SignRequest set Authorization header
          def Sign(self, r):
              if sys.version_info.major == 3 and isinstance(r.body, str):
                  r.body = r.body.encode('utf-8')
              headerTime = findHeader(r, HeaderXDate)
              if headerTime is None:
                  t = datetime.utcnow()
                  r.headers[HeaderXDate] = datetime.strftime(t, BasicDateFormat)
              else:
                  t = datetime.strptime(headerTime, BasicDateFormat)
      
              haveHost = False
              for key in r.headers:
                  if key.lower() == 'host':
                      haveHost = True
                      break
              if not haveHost:
                  r.headers["host"] = r.host
              signedHeaders = SignedHeaders(r)
              canonicalRequest = CanonicalRequest(r, signedHeaders)
              stringToSign = StringToSign(canonicalRequest, t)
              signature = SignStringToSign(stringToSign, self.Secret)
              authValue = AuthHeaderValue(signature, self.Key, signedHeaders)
              r.headers[HeaderAuthorization] = authValue
              r.headers["content-length"] = str(len(r.body))
              queryString = CanonicalQueryString(r)
              if queryString != "":
                  r.uri = r.uri + "?" + queryString
      

  4. Go to the function details page and on the Code tab page, modify the following configuration items in index.py.

    • LOG_BUCKET = "***oss log bucket***"
    • OSS_ENDPOINT = "***oss endpoint***"
    • HW_AK = "***Access Key***"
    • HW_SK = "***Secret Access Key***"
    • SYNC_URL = "***Synchronization Request Receiving Address***"

  5. On the Configuration tab page, choose Trigger on the left and click Create Trigger. Create an OSS event trigger by referring to Table 2 and click OK.

    Table 2 Parameters required for creating a trigger

    Parameter

    Description

    Trigger Type

    Choose OSS.

    Name

    Enter a name.

    Version or Alias

    Retain the default value.

    Bucket Name

    Select the source bucket. The bucket must be the one in the synchronization task.

    Trigger Event

    Retain the default value.

    Role Name

    Retain the default value.

  6. Click the Test tab, set Event Template to OSS, customize an event name, and click Test Function to perform the test. If the execution result is true, the test is successful. If the test fails, check whether the function parameters are configured correctly and whether you have required permissions.

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback