购买OSS标准存储包之后,就可以使用OSS的各种服务了
OSS访问域名和数据中心_对象存储(OSS)-阿里云帮助中心
在官方的表格上找到自己OSS标准存储包对应的外网Endpoint即可
- # -*- coding: utf-8 -*-
- import oss2
- import os
- from oss2.credentials import EnvironmentVariableCredentialsProvider
-
- # 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
- auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
- # yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
- # 填写Bucket名称。
- bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
-
- # 必须以二进制的方式打开文件。
- # 填写本地文件的完整路径。如果未指定本地路径,则默认从示例程序所属项目对应本地路径中上传文件。
- with open('D:\\localpath\\examplefile.txt', 'rb') as fileobj:
- # Seek方法用于指定从第1000个字节位置开始读写。上传时会从您指定的第1000个字节位置开始上传,直到文件结束。
- fileobj.seek(1000, os.SEEK_SET)
- # Tell方法用于返回当前位置。
- current = fileobj.tell()
- # 填写Object完整路径。Object完整路径中不能包含Bucket名称。
- bucket.put_object('exampleobject.txt', fileobj)
- # -*- coding: utf-8 -*-
- import json
- import base64
- import oss2
- from oss2.credentials import EnvironmentVariableCredentialsProvider
-
- # 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
- auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
- # yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
- # 填写Bucket名称。
- bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
-
- # 定义回调参数Base64编码函数。
- def encode_callback(callback_params):
- cb_str = json.dumps(callback_params).strip()
- return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))
-
- # 设置上传回调参数。
- callback_params = {}
- # 设置回调请求的服务器地址,例如http://oss-demo.aliyuncs.com:23450。
- callback_params['callbackUrl'] = 'http://oss-demo.aliyuncs.com:23450'
- #(可选)设置回调请求消息头中Host的值,即您的服务器配置Host的值。
- #callback_params['callbackHost'] = 'yourCallbackHost'
- # 设置发起回调时请求body的值。
- callback_params['callbackBody'] = 'bucket=${bucket}&object=${object}'
- # 设置发起回调请求的Content-Type。
- callback_params['callbackBodyType'] = 'application/x-www-form-urlencoded'
- encoded_callback = encode_callback(callback_params)
- # 设置发起回调请求的自定义参数,由Key和Value组成,Key必须以x:开始。
- callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
- encoded_callback_var = encode_callback(callback_var_params)
-
- # 上传回调。
- params = {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
- # 填写Object完整路径和字符串。Object完整路径中不能包含Bucket名称。
- result = bucket.put_object('examplefiles/exampleobject.txt', 'a'*1024*1024, params)
- # -*- coding: utf-8 -*-
- from __future__ import print_function
- import os, sys
- import oss2
- from oss2.credentials import EnvironmentVariableCredentialsProvider
- # 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
- auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider())
- # 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
- # yourBucketName填写存储空间名称。
- bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')
- # consumed_bytes表示已上传的数据量。
- # total_bytes表示待上传的总数据量。当无法确定待上传的数据长度时,total_bytes的值为None。
- def percentage(consumed_bytes, total_bytes):
- if total_bytes:
- rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
- print('\r{0}% '.format(rate), end='')
- sys.stdout.flush()
- # progress_callback为可选参数,用于实现进度条功能。
- bucket.put_object('yourObjectName', 'a'*1024*1024, progress_callback=percentage)
- import json
- import base64
- import oss2
- from oss_bucket import get_bucket
- import sys
-
-
- # 定义回调参数Base64编码函数。
- def encode_callback(callback_params):
- cb_str = json.dumps(callback_params).strip()
- return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))
-
-
- def prepare_callback_params():
- # 设置上传回调参数。
- callback_params = {
- 'callbackUrl': 'http://oss-demo.aliyuncs.com:23450',
- 'callbackBody': 'bucket=${bucket}&object=${object}',
- 'callbackBodyType': 'application/x-www-form-urlencoded'
- }
- encoded_callback = encode_callback(callback_params)
-
- # 设置发起回调请求的自定义参数,由Key和Value组成,Key必须以x:开始。
- callback_var_params = {'x:my_var1': 'my_val1', 'x:my_var2': 'my_val2'}
- encoded_callback_var = encode_callback(callback_var_params)
-
- return {'x-oss-callback': encoded_callback, 'x-oss-callback-var': encoded_callback_var}
-
-
- def percentage(consumed_bytes, total_bytes):
- if total_bytes:
- rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
- print('\r{0}% '.format(rate), end='')
- sys.stdout.flush()
-
-
- if __name__ == '__main__':
-
- # 上传回调。
- params = prepare_callback_params()
- bucket = get_bucket()
- object_name = 'examplefiles/exampleobject.txt'
-
- try:
- # 填写Object完整路径和字符串。Object完整路径中不能包含Bucket名称。
- result = bucket.put_object(object_name, 'a' * 1024 * 1024, params, progress_callback=percentage)
- print("\nUpload successful!")
- print(f"Result:{result}")
- print(f"bucket.bucket_name:{bucket.bucket_name}")
- print(f"bucket.endpointL:{bucket.endpoint}")
- print(f"object_name:{object_name}")
- file_url = f"https://{bucket.bucket_name}.{bucket.endpoint.replace('https://', '')}/{object_name}"
- print(f"File URL: {file_url}")
- except oss2.exceptions.RequestError as e:
- print(f"Request error: {e}")
- except oss2.exceptions.ServerError as e:
- print(f"Server error: {e}")
- except oss2.exceptions.NoSuchBucket as e:
- print(f"No such bucket: {e}")
- except Exception as e:
- print(f"Unexpected error: {e}")