告别COW插件 -- 3分钟,让你的微信机器人,拥有一双卡姿兰“大眼睛”

picture.image

(附一张还在beta阶段的Agent平台生成的图)

目前小伙伴们在玩微信机器人的时候,如果想让机器人“识图”,只能通过使用第三方插件来实现。

而第三方插件需要额外的购买Token,并且修改插件的Prompt也不方便。

经过上次的共学,大家使用的基本都是Coze接入的微信,如果不能识图的话,有点浪费了Coze的能力,如果使用插件的话,又有点太麻烦还需要额外去买Token。

有好多小伙伴被这个东西困扰

picture.image

同时又因为这哥们打赏了200

picture.image

向来都是别人白嫖我,这天突然白嫖了别人

恍惚间竟有点过意不去

那么

想要实现“视觉”能力,其实就是把微信上收到的“图片”转化成“图片链接”,然后传递给Coze,Coze中调用识图能力的模型或插件,就可以达到效果。

那么,如果要转换图片,首选肯定是图床平台,于是喊了哥们帮忙调这些平台的接口,在我们如火如荼的进行时,看到了这个:

picture.image

一下子我就冷静下来了....

所以,最终自建了一个图片转换服务。

我不敢以身试法,但自己的小服务器又承载不了大家的使用。

所以, 本篇文章中贡献出了实现功能的所有代码。但是把图片服务接口给隐藏了。

大家有两种方式使用:

1、可以问下Kimi找图片转换的接口放进去、买服务或者自己以身试法一下就可以使用了。

2、如果你 懒得找接口同时也不想以身试法 ,那么你可以选择付费文章,给你的服务器开白名单。


教程很简单,共两个文件,替换一下就好了。

第一步

打开你的Cow项目,也就是chatgpt-on-wechat,

在 /root/chatgpt-on-wechat/channel/wechat 这个路径下,

找到你的文件夹:wechat_channel.py

将以下代码全部替换进去:

  
# encoding:utf-8  
  
"""  
wechat channel  
"""  
  
import io  
import json  
import os  
import re  
import threading  
import time  
  
import requests  
import random  
  
from bridge.context import *  
from bridge.reply import *  
from channel.chat_channel import ChatChannel  
from channel import chat_channel  
from channel.wechat.wechat_message import *  
from common.expired_dict import ExpiredDict  
from common.log import logger  
from common.singleton import singleton  
from common.time_check import time_checker  
from config import conf, get_appdata_dir  
from lib import itchat  
from lib.itchat.content import *  
  
  
@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING])  
def handler_single_msg(msg):  
 try:  
 cmsg = WechatMessage(msg, False)  
 except NotImplementedError as e:  
 logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))  
 return None  
 WechatChannel().handle_single(cmsg)  
 return None  
  
  
@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING], isGroupChat=True)  
def handler_group_msg(msg):  
 try:  
 cmsg = WechatMessage(msg, True)  
 except NotImplementedError as e:  
 logger.debug("[WX]group message {} skipped: {}".format(msg["MsgId"], e))  
 return None  
 WechatChannel().handle_group(cmsg)  
 return None  
  
  
def _check(func):  
 def wrapper(self, cmsg: ChatMessage):  
 msgId = cmsg.msg_id  
 if msgId in self.receivedMsgs:  
 logger.info("Wechat message {} already received, ignore".format(msgId))  
 return  
 self.receivedMsgs[msgId] = True  
 create_time = cmsg.create_time # 消息时间戳  
 if conf().get("hot_reload") == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息  
 logger.debug("[WX]history message {} skipped".format(msgId))  
 return  
 if cmsg.my_msg and not cmsg.is_group:  
 logger.debug("[WX]my message {} skipped".format(msgId))  
 return  
 return func(self, cmsg)  
  
 return wrapper  
  
  
# 可用的二维码生成接口  
# https://api.qrserver.com/v1/create-qr-code/?size=400×400&data=https://www.abc.com  
# https://api.isoyu.com/qr/?m=1&e=L&p=20&url=https://www.abc.com  
def qrCallback(uuid, status, qrcode):  
 # logger.debug("qrCallback: {} {}".format(uuid,status))  
 if status == "0":  
 try:  
 from PIL import Image  
  
 img = Image.open(io.BytesIO(qrcode))  
 _thread = threading.Thread(target=img.show, args=("QRCode",))  
 _thread.setDaemon(True)  
 _thread.start()  
 except Exception as e:  
 pass  
  
 import qrcode  
  
 url = f"https://login.weixin.qq.com/l/{uuid}"  
  
 qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url)  
 qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url)  
 qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url)  
 qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url)  
 print("You can also scan QRCode in any website below:")  
 print(qr_api3)  
 print(qr_api4)  
 print(qr_api2)  
 print(qr_api1)  
 _send_qr_code([qr_api1, qr_api2, qr_api3, qr_api4])  
 qr = qrcode.QRCode(border=1)  
 qr.add_data(url)  
 qr.make(fit=True)  
 qr.print_ascii(invert=True)  
  
  
@singleton  
class WechatChannel(ChatChannel):  
 NOT_SUPPORT_REPLYTYPE = []  
  
 def __init__(self):  
 super().__init__()  
 self.receivedMsgs = ExpiredDict(60 * 60)  
 self.auto_login_times = 0  
  
 def startup(self):  
 try:  
 itchat.instance.receivingRetryCount = 600 # 修改断线超时时间  
 # login by scan QRCode  
 hotReload = conf().get("hot_reload", False)  
 status_path = os.path.join(get_appdata_dir(), "itchat.pkl")  
 itchat.auto_login(  
 enableCmdQR=2,  
 hotReload=hotReload,  
 statusStorageDir=status_path,  
 qrCallback=qrCallback,  
 exitCallback=self.exitCallback,  
 loginCallback=self.loginCallback  
 )  
 self.user_id = itchat.instance.storageClass.userName  
 self.name = itchat.instance.storageClass.nickName  
 logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))  
 # start message listener  
 itchat.run()  
 except Exception as e:  
 logger.error(e)  
  
 def exitCallback(self):  
 try:  
 from common.linkai_client import chat_client  
 if chat_client.client_id and conf().get("use_linkai"):  
 _send_logout()  
 time.sleep(2)  
 self.auto_login_times += 1  
 if self.auto_login_times < 100:  
 chat_channel.handler_pool._shutdown = False  
 self.startup()  
 except Exception as e:  
 pass  
  
 def loginCallback(self):  
 logger.debug("Login success")  
 _send_login_success()  
  
 # handle_* 系列函数处理收到的消息后构造Context,然后传入produce函数中处理Context和发送回复  
 # Context包含了消息的所有信息,包括以下属性  
 # type 消息类型, 包括TEXT、VOICE、IMAGE_CREATE  
 # content 消息内容,如果是TEXT类型,content就是文本内容,如果是VOICE类型,content就是语音文件名,如果是IMAGE_CREATE类型,content就是图片生成命令  
 # kwargs 附加参数字典,包含以下的key:  
 # session_id: 会话id  
 # isgroup: 是否是群聊  
 # receiver: 需要回复的对象  
 # msg: ChatMessage消息对象  
 # origin_ctype: 原始消息类型,语音转文字后,私聊时如果匹配前缀失败,会根据初始消息是否是语音来放宽触发规则  
 # desire_rtype: 希望回复类型,默认是文本回复,设置为ReplyType.VOICE是语音回复  
 @time_checker  
 @_check  
 def handle_single(self, cmsg: ChatMessage):  
 # filter system message  
 if cmsg.other_user_id in ["weixin"]:  
 return  
 if cmsg.ctype == ContextType.VOICE:  
 if conf().get("speech_recognition") != True:  
 return  
 logger.debug("[WX]receive voice msg: {}".format(cmsg.content))  
 elif cmsg.ctype == ContextType.IMAGE:  
 cmsg.prepare()  
 logger.debug("[WX]receive image msg: {},------{}".format(cmsg.content,cmsg._prepare_fn))  
  
 elif cmsg.ctype == ContextType.PATPAT:  
 logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))  
 elif cmsg.ctype == ContextType.TEXT:  
 logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))  
 else:  
 logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))  
 context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)  
 if context:  
 self.produce(context)  
  
 @time_checker  
 @_check  
 def handle_group(self, cmsg: ChatMessage):  
 if cmsg.ctype == ContextType.VOICE:  
 if conf().get("group_speech_recognition") != True:  
 return  
 logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))  
 elif cmsg.ctype == ContextType.IMAGE:  
 cmsg.prepare()  
 logger.debug("[WX]receive image for group msg: {}".format(cmsg.content))  
 elif cmsg.ctype in [ContextType.JOIN_GROUP, ContextType.PATPAT, ContextType.ACCEPT_FRIEND,  
 ContextType.EXIT_GROUP]:  
 logger.debug("[WX]receive note msg: {}".format(cmsg.content))  
 elif cmsg.ctype == ContextType.TEXT:  
 # logger.debug("[WX]receive group msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))  
 pass  
 elif cmsg.ctype == ContextType.FILE:  
 logger.debug(f"[WX]receive attachment msg, file_name={cmsg.content}")  
 else:  
 logger.debug("[WX]receive group msg: {}".format(cmsg.content))  
 context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=True, msg=cmsg)  
 if context:  
 self.produce(context)  
  
 # 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息  
  
 def send(self, reply: Reply, context: Context):  
 receiver = context["receiver"]  
 if reply.type == ReplyType.TEXT:  
 time.sleep(0)  
 # def send(self, reply: Reply, context: Context):  
 # receiver = context["receiver"]  
 # if reply.type == ReplyType.TEXT:  
 # 分割标点符号,包括特殊的 '\\n'  
 split_punctuation = ['//n']  
 # 创建一个正则表达式模式,用来分割消息,确保正确处理 '||<'  
 pattern = '|'.join(map(lambda x: re.escape(x), split_punctuation))  
 # if len([con["content"] for con in reply.session_ago if con['role']=="assistant"])>1:  
 if reply.session_ago and len([con["content"] for con in reply.session_ago if con['role']=="assistant"]) > 1:  
 old_messages=[ str(con["content"]).replace('\n','').replace('\\n','') for con in reply.session_ago if con['role']=="assistant"][:-1]  
 old_str_data=" ".join(old_messages)  
 if old_str_data in str(reply.content).replace('\n','').replace('\\n',''):  
 logger.info(f'~~~~~~~~~~~~~~~replace:{old_str_data}')  
 logger.info(f'*******************ago*****************{reply.content}')  
 reply.content=str(reply.content).replace('\n','').replace('\\n','').replace(old_str_data,'')  
 logger.info(f'*******************now*****************{reply.content}')  
 # 使用正则表达式来分割消息  
 split_messages = re.split(pattern, reply.content)  
 logger.info(f'zzzzzzzzzzzzzzzz{reply.session_ago}')  
 # 移除空行  
 split_messages = [msg.strip() for msg in split_messages if msg.strip() != '']  
  
 for msg in split_messages:  
 is_send=True  
 pd_re_img=re.findall('(http.*?.(png|jpg|gif))',str(msg))  
 if "![Image]" in str(msg):  
 img_url_list=re.findall('\[Image\]\((.*?)\)',str(msg))  
 if img_url_list:  
 is_send=False  
 # itchat.send(msg, toUserName=receiver)  
 try:  
 for img_url in img_url_list:  
 logger.debug(f"[WX] start download image, img_url={img_url}")  
 pic_res = requests.get(str(img_url).replace('(', '').replace(')', ''),  
 stream=True)  
 image_storage = io.BytesIO()  
 size = 0  
 for block in pic_res.iter_content(1024):  
 size += len(block)  
 image_storage.write(block)  
 logger.info(f"[WX] download image success, size={size}, img_url={img_url}")  
 image_storage.seek(0)  
 itchat.send_image(image_storage, toUserName=receiver)  
 logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))  
 image_storage.close()  
 except Exception as e:  
 logger.error(e)  
 if not is_send:  
 # 发送消息  
 itchat.send(msg, toUserName=receiver)  
 logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))  
 else:  
 itchat.send(msg, toUserName=receiver)  
 logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))  
 elif "你可以点击[这里]" in str(msg) :  
 try:  
 img_url_list = re.findall('\((.*?)\)', str(msg))  
 if img_url_list:  
 is_send = False  
 # itchat.send(msg, toUserName=receiver)  
 for img_url in img_url_list:  
 logger.debug(f"[WX] start download image, img_url={img_url}")  
 pic_res = requests.get(str(img_url).replace('(', '').replace(')', ''),  
 stream=True)  
 image_storage = io.BytesIO()  
 size = 0  
 for block in pic_res.iter_content(1024):  
 size += len(block)  
 image_storage.write(block)  
 logger.info(f"[WX] download image success, size={size}, img_url={img_url}")  
 image_storage.seek(0)  
 itchat.send_image(image_storage, toUserName=receiver)  
 logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))  
 image_storage.close()  
 else:  
 itchat.send(msg, toUserName=receiver)  
 logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))  
 except Exception as e:  
 logger.info(f"图片转换错误:{e}")  
 if not is_send:  
 # 发送消息  
 itchat.send(msg, toUserName=receiver)  
 logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))  
 elif pd_re_img:  
 try:  
 is_send = False  
 # itchat.send(msg, toUserName=receiver)  
 for pd_re in pd_re_img:  
 if pd_re:  
 logger.debug(f"[WX] start download image, img_url={pd_re[0]}")  
 pic_res = requests.get(str(pd_re[0]).replace('(','').replace(')',''), stream=True)  
 image_storage = io.BytesIO()  
 size = 0  
 for block in pic_res.iter_content(1024):  
 size += len(block)  
 image_storage.write(block)  
 logger.info(f"[WX] download image success, size={size}, img_url={pd_re[0]}")  
 image_storage.seek(0)  
 itchat.send_image(image_storage, toUserName=receiver)  
 logger.info("[WX] sendImage url={}, receiver={}".format(pd_re[0], receiver))  
 image_storage.close()  
 except Exception as e:  
 logger.info(f"图片转换错误:{e}")  
 if not is_send:  
 # 发送消息  
 itchat.send(msg, toUserName=receiver)  
 logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))  
  
 else:  
 # 发送消息  
 itchat.send(msg, toUserName=receiver)  
 logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))  
 # 等待随机时间  
 r_time = random.uniform(1, 2)  
 time.sleep(r_time)  
  
 # itchat.send(reply.content, toUserName=receiver)  
 # logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))  
  
 elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:  
 itchat.send(reply.content, toUserName=receiver)  
 logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))  
 elif reply.type == ReplyType.VOICE:  
 itchat.send_file(reply.content, toUserName=receiver)  
 logger.info("[WX] sendFile={}, receiver={}".format(reply.content, receiver))  
 elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片  
 img_url = reply.content  
 logger.debug(f"[WX] start download image, img_url={img_url}")  
 pic_res = requests.get(img_url, stream=True)  
 image_storage = io.BytesIO()  
 size = 0  
 for block in pic_res.iter_content(1024):  
 size += len(block)  
 image_storage.write(block)  
 logger.info(f"[WX] download image success, size={size}, img_url={img_url}")  
 image_storage.seek(0)  
 itchat.send_image(image_storage, toUserName=receiver)  
 logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))  
 elif reply.type == ReplyType.IMAGE: # 从文件读取图片  
 image_storage = reply.content  
 itchat.send_image(image_storage, toUserName=receiver)  
 logger.info("[WX] sendImage, receiver={}".format(receiver))  
 elif reply.type == ReplyType.FILE: # 新增文件回复类型  
 file_storage = reply.content  
 itchat.send_file(file_storage, toUserName=receiver)  
 logger.info("[WX] sendFile, receiver={}".format(receiver))  
 elif reply.type == ReplyType.VIDEO: # 新增视频回复类型  
 video_storage = reply.content  
 itchat.send_video(video_storage, toUserName=receiver)  
 logger.info("[WX] sendFile, receiver={}".format(receiver))  
 elif reply.type == ReplyType.VIDEO_URL: # 新增视频URL回复类型  
 video_url = reply.content  
 logger.debug(f"[WX] start download video, video_url={video_url}")  
 video_res = requests.get(video_url, stream=True)  
 video_storage = io.BytesIO()  
 size = 0  
 for block in video_res.iter_content(1024):  
 size += len(block)  
 video_storage.write(block)  
 logger.info(f"[WX] download video success, size={size}, video_url={video_url}")  
 video_storage.seek(0)  
 itchat.send_video(video_storage, toUserName=receiver)  
 logger.info("[WX] sendVideo url={}, receiver={}".format(video_url, receiver))  
  
  
def _send_login_success():  
 try:  
 from common.linkai_client import chat_client  
 if chat_client.client_id:  
 chat_client.send_login_success()  
 except Exception as e:  
 pass  
  
  
def _send_logout():  
 try:  
 from common.linkai_client import chat_client  
 if chat_client.client_id:  
 chat_client.send_logout()  
 except Exception as e:  
 pass  
  
  
def _send_qr_code(qrcode_list: list):  
 try:  
 from common.linkai_client import chat_client  
 if chat_client.client_id:  
 chat_client.send_qrcode(qrcode_list)  
 except Exception as e:  
 pass  

第二步

来到/root/chatgpt-on-wechat/bot/bytedance这个路径下(如果你找不到这个路径,去共学文档中,去补课Coze API 接入教程)

打开:bytedance_coze_bot.py 文件,将以下代码,全部粘贴进入。

注意,下方代码中,标红部分,就是你需要自己找的接口。

  
# encoding:utf-8  
  
import time  
from typing import List, Tuple  
import os  
import requests  
from requests import Response  
  
from bot.bot import Bot  
from bot.chatgpt.chat_gpt_session import ChatGPTSession  
from bot.session_manager import SessionManager  
from bridge.context import ContextType  
from bridge.reply import Reply, ReplyType  
from common.log import logger  
from config import conf  
  
  
class ByteDanceCozeBot(Bot):  
 def __init__(self):  
 super().__init__()  
 self.sessions = SessionManager(ChatGPTSession, model=conf().get("model") or "coze")  
  
 def reply(self, query, context=None):  
 # acquire reply content  
 if context.type == ContextType.TEXT:  
 logger.info("[COZE] query={}".format(query))  
 img_content = None  
 if str(query).endswith(".png"):  
 image_path = f"{os.getcwd()}/{query}"  
  
 with open(image_path, "rb") as f:  
 img_content = f.read()  
 res = requests.post("图片转换服务的地址",  
 files={'file': ('image.jpg', img_content)})  
 if not res:  
 return "图片上传失败"  
 img_on_url = res.text  
 logger.info(f'转化后的图片地址:{img_on_url}')  
 query = f"{img_on_url}"  
 try:  
 os.remove(image_path)  
 except:  
 pass  
 session_id = context["session_id"]  
 session = self.sessions.session_query(query, session_id)  
 logger.debug("[COZE] session query={}".format(session.messages))  
 reply_content, err = self._reply_text(session_id, session)  
 if err is not None:  
 logger.error("[COZE] reply error={}".format(err))  
 return Reply(ReplyType.ERROR, "我暂时遇到了一些问题,请您稍后重试~")  
 logger.debug(  
 "[COZE] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format(  
 session.messages,  
 session_id,  
 reply_content["content"],  
 reply_content["completion_tokens"],  
 )  
 )  
 self.sessions.session_reply(reply_content["content"], session_id,  
 reply_content["total_tokens"])  
 return Reply(ReplyType.TEXT, reply_content["content"])  
 else:  
 reply = Reply(ReplyType.ERROR, "Bot不支持处理{}类型的消息".format(context.type))  
 return reply  
  
 def _get_api_base_url(self):  
 return conf().get("coze_api_base", "https://api.coze.cn/open_api/v2")  
  
 def _get_headers(self):  
 return {  
 'Authorization': f"Bearer {conf().get('coze_api_key', '')}"  
 }  
  
 def _get_payload(self, user: str, query: str, chat_history: List[dict]):  
 return {  
 'bot_id': conf().get('coze_bot_id'),  
 "user": user,  
 "query": query,  
 "chat_history": chat_history,  
 "stream": False  
 }  
  
 def _reply_text(self, session_id: str, session: ChatGPTSession, retry_count=0):  
 try:  
 query, chat_history = self._convert_messages_format(session.messages)  
 base_url = self._get_api_base_url()  
 chat_url = f'{base_url}/chat'  
 headers = self._get_headers()  
 payload = self._get_payload(session.session_id, query, chat_history)  
 response = requests.post(chat_url, headers=headers, json=payload)  
 if response.status_code != 200:  
 error_info = f"[COZE] response text={response.text} status_code={response.status_code}"  
 logger.warn(error_info)  
 return None, error_info  
 answer, err = self._get_completion_content(response)  
 if err is not None:  
 return None, err  
 completion_tokens, total_tokens = self._calc_tokens(session.messages, answer)  
 return {  
 "total_tokens": total_tokens,  
 "completion_tokens": completion_tokens,  
 "content": answer  
 }, None  
 except Exception as e:  
 if retry_count < 2:  
 time.sleep(3)  
 logger.warn(f"[COZE] Exception: {repr(e)}{retry_count + 1}次重试")  
 return self._reply_text(session_id, session, retry_count + 1)  
 else:  
 return None, f"[COZE] Exception: {repr(e)} 超过最大重试次数"  
  
 def _convert_messages_format(self, messages) -> Tuple[str, List[dict]]:  
 # [  
 # {"role":"user","content":"你好","content_type":"text"},  
 # {"role":"assistant","type":"answer","content":"你好,请问有什么可以帮助你的吗?","content_type":"text"}  
 # ]  
 chat_history = []  
 for message in messages:  
 role = message.get('role')  
 if role == 'user':  
 content = message.get('content')  
 chat_history.append({"role": "user", "content": content, "content_type": "text"})  
 elif role == 'assistant':  
 content = message.get('content')  
 chat_history.append({"role": "assistant", "type": "answer", "content": content, "content_type": "text"})  
 elif role == 'system':  
 # TODO: deal system message  
 pass  
 user_message = chat_history.pop()  
 if user_message.get('role') != 'user' or user_message.get('content', '') == '':  
 raise Exception('no user message')  
 query = user_message.get('content')  
 logger.debug("[COZE] converted coze messages: {}".format([item for item in chat_history]))  
 logger.debug("[COZE] user content as query: {}".format(query))  
 return query, chat_history  
  
 def _get_completion_content(self, response: Response):  
 json_response = response.json()  
 if json_response['msg'] != 'success':  
 return None, f"[COZE] Error: {json_response['msg']}"  
 answer = None  
 for message in json_response['messages']:  
 if message.get('type') == 'answer':  
 answer = message.get('content')  
 break  
 if not answer:  
 return None, "[COZE] Error: empty answer"  
 return answer, None  
  
 def _calc_tokens(self, messages, answer):  
 # 简单统计token  
 completion_tokens = len(answer)  
 prompt_tokens = 0  
 for message in messages:  
 prompt_tokens += len(message["content"])  
 return completion_tokens, prompt_tokens + completion_tokens  

以上就是功能的实现代码。完整配置后,就会发现,当你给机器人发一个图片时,他会转换成一个图片地址,然后发送给LLM去使用。以此达到“眼睛”的效果。

记得在你的Coze中,也要加上你想要对图片做处理的“工作流”或“图片理解”插件。

最后,重启机器人即可完成。

picture.image

( 看到这里不来个关注嘛↓)


如果你不想自己找接口,那么你可以付费文章,然后私聊或在公众号内发送,你的服务器地址,我会开白名单,让你能够使用。

注意:因为是自建服务,我不能保证让你永久使用,只能先保证至少3个月内,你可以无限制使用图片功能。

如果用的人少,我有可能会在3个月后下掉服务!!! 如果用的人少,有可能会在3个月后下掉服务!!! 如果用的人少,有可能会在3个月后下掉服务!!!

0
0
0
0
评论
未登录
暂无评论