脚本同级目录 上传文件 :test-mojuan.doc
import requests
import os
import json
from datetime import datetime
MY_COZE_TOKEN = '你的 Coze  Token'
class CozeFileAPI:
    def __init__(self, access_token):
        self.base_url = "https://api.coze.cn/v1"
        self.access_token = access_token
        self.headers = {
            "Authorization": f"Bearer {access_token}"
        }
    def upload_file(self, file_path):
        """
        上传文件到Coze
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")
        # 检查文件大小
        file_size = os.path.getsize(file_path)
        if file_size > 512 * 1024 * 1024:  # 512MB
            raise ValueError("文件大小超过512MB限制")
        url = f"{self.base_url}/files/upload"
        
        # 准备文件
        files = {
            'file': (os.path.basename(file_path), open(file_path, 'rb'))
        }
        try:
            response = requests.post(
                url,
                headers=self.headers,
                files=files
            )
            
            # 确保文件被正确关闭
            files['file'][1].close()
            # 检查响应
            response.raise_for_status()
            result = response.json()
            if result.get('code') == 0:
                print("文件上传成功!")
                return result['data']
            else:
                raise Exception(f"上传失败: {result.get('msg', '未知错误')}")
        except requests.exceptions.RequestException as e:
            raise Exception(f"请求错误: {str(e)}")
    def retrieve_file(self, file_id):
        """
        获取文件详情
        """
        url = f"{self.base_url}/files/retrieve"
        
        headers = {
            **self.headers,
            "Content-Type": "application/json"
        }
        
        params = {
            "file_id": file_id
        }
        try:
            response = requests.get(
                url,
                headers=headers,
                params=params
            )
            response.raise_for_status()
            result = response.json()
            if result.get('code') == 0:
                print("获取文件信息成功!")
                return result['data']
            else:
                raise Exception(f"获取文件信息失败: {result.get('msg', '未知错误')}")
        except requests.exceptions.RequestException as e:
            raise Exception(f"请求错误: {str(e)}")
    @staticmethod
    def format_file_info(file_info):
        """
        格式化文件信息显示
        """
        created_time = datetime.fromtimestamp(file_info['created_at']).strftime('%Y-%m-%d %H:%M:%S')
        size_mb = file_info['bytes'] / (1024 * 1024)
        return f"""
文件信息:
- ID: {file_info['id']}
- 文件名: {file_info['file_name']}
- 大小: {size_mb:.2f} MB
- 上传时间: {created_time}
"""
def main():
    ACCESS_TOKEN = MY_COZE_TOKEN
    
    # 创建API实例
    api = CozeFileAPI(ACCESS_TOKEN)
    try:
        # 上传文件测试
        file_path = "test-mojuan.doc"  # 替换为要测试的文件路径
        print(f"\n开始上传文件: {file_path}")
        upload_result = api.upload_file(file_path)
        print(api.format_file_info(upload_result))
        # 获取文件信息测试
        file_id = upload_result['id']
        print(f"\n获取文件信息: {file_id}")
        file_info = api.retrieve_file(file_id)
        print(api.format_file_info(file_info))
    except Exception as e:
        print(f"错误: {str(e)}")
if __name__ == "__main__":
    main()
