(附一张还在beta阶段的Agent平台生成的图)
目前小伙伴们在玩微信机器人的时候,如果想让机器人“识图”,只能通过使用第三方插件来实现。
而第三方插件需要额外的购买Token,并且修改插件的Prompt也不方便。
经过上次的共学,大家使用的基本都是Coze接入的微信,如果不能识图的话,有点浪费了Coze的能力,如果使用插件的话,又有点太麻烦还需要额外去买Token。
有好多小伙伴被这个东西困扰
同时又因为这哥们打赏了200
向来都是别人白嫖我,这天突然白嫖了别人
恍惚间竟有点过意不去
那么
想要实现“视觉”能力,其实就是把微信上收到的“图片”转化成“图片链接”,然后传递给Coze,Coze中调用识图能力的模型或插件,就可以达到效果。
那么,如果要转换图片,首选肯定是图床平台,于是喊了哥们帮忙调这些平台的接口,在我们如火如荼的进行时,看到了这个:
一下子我就冷静下来了....
所以,最终自建了一个图片转换服务。
我不敢以身试法,但自己的小服务器又承载不了大家的使用。
所以, 本篇文章中贡献出了实现功能的所有代码。但是把图片服务接口给隐藏了。
大家有两种方式使用:
1、可以问下Kimi找图片转换的接口放进去、买服务或者自己以身试法一下就可以使用了。
2、如果你 懒得找接口同时也不想以身试法 ,那么你可以选择付费文章,给你的服务器开白名单。
教程很简单,共两个文件,替换一下就好了。
第一步
打开你的Cow项目,也就是chatgpt-on-wechat,
在 /root/chatgpt-on-wechat/channel/wechat 这个路径下,
找到你的文件夹:wechat_channel.py
将以下代码全部替换进去:
# encoding:utf-8
"""
wechat channel
"""
import io
import json
import os
import re
import threading
import time
import requests
import random
from bridge.context import *
from bridge.reply import *
from channel.chat_channel import ChatChannel
from channel import chat_channel
from channel.wechat.wechat_message import *
from common.expired_dict import ExpiredDict
from common.log import logger
from common.singleton import singleton
from common.time_check import time_checker
from config import conf, get_appdata_dir
from lib import itchat
from lib.itchat.content import *
@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING])
def handler_single_msg(msg):
try:
cmsg = WechatMessage(msg, False)
except NotImplementedError as e:
logger.debug("[WX]single message {} skipped: {}".format(msg["MsgId"], e))
return None
WechatChannel().handle_single(cmsg)
return None
@itchat.msg_register([TEXT, VOICE, PICTURE, NOTE, ATTACHMENT, SHARING], isGroupChat=True)
def handler_group_msg(msg):
try:
cmsg = WechatMessage(msg, True)
except NotImplementedError as e:
logger.debug("[WX]group message {} skipped: {}".format(msg["MsgId"], e))
return None
WechatChannel().handle_group(cmsg)
return None
def _check(func):
def wrapper(self, cmsg: ChatMessage):
msgId = cmsg.msg_id
if msgId in self.receivedMsgs:
logger.info("Wechat message {} already received, ignore".format(msgId))
return
self.receivedMsgs[msgId] = True
create_time = cmsg.create_time # 消息时间戳
if conf().get("hot_reload") == True and int(create_time) < int(time.time()) - 60: # 跳过1分钟前的历史消息
logger.debug("[WX]history message {} skipped".format(msgId))
return
if cmsg.my_msg and not cmsg.is_group:
logger.debug("[WX]my message {} skipped".format(msgId))
return
return func(self, cmsg)
return wrapper
# 可用的二维码生成接口
# https://api.qrserver.com/v1/create-qr-code/?size=400×400&data=https://www.abc.com
# https://api.isoyu.com/qr/?m=1&e=L&p=20&url=https://www.abc.com
def qrCallback(uuid, status, qrcode):
# logger.debug("qrCallback: {} {}".format(uuid,status))
if status == "0":
try:
from PIL import Image
img = Image.open(io.BytesIO(qrcode))
_thread = threading.Thread(target=img.show, args=("QRCode",))
_thread.setDaemon(True)
_thread.start()
except Exception as e:
pass
import qrcode
url = f"https://login.weixin.qq.com/l/{uuid}"
qr_api1 = "https://api.isoyu.com/qr/?m=1&e=L&p=20&url={}".format(url)
qr_api2 = "https://api.qrserver.com/v1/create-qr-code/?size=400×400&data={}".format(url)
qr_api3 = "https://api.pwmqr.com/qrcode/create/?url={}".format(url)
qr_api4 = "https://my.tv.sohu.com/user/a/wvideo/getQRCode.do?text={}".format(url)
print("You can also scan QRCode in any website below:")
print(qr_api3)
print(qr_api4)
print(qr_api2)
print(qr_api1)
_send_qr_code([qr_api1, qr_api2, qr_api3, qr_api4])
qr = qrcode.QRCode(border=1)
qr.add_data(url)
qr.make(fit=True)
qr.print_ascii(invert=True)
@singleton
class WechatChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = []
def __init__(self):
super().__init__()
self.receivedMsgs = ExpiredDict(60 * 60)
self.auto_login_times = 0
def startup(self):
try:
itchat.instance.receivingRetryCount = 600 # 修改断线超时时间
# login by scan QRCode
hotReload = conf().get("hot_reload", False)
status_path = os.path.join(get_appdata_dir(), "itchat.pkl")
itchat.auto_login(
enableCmdQR=2,
hotReload=hotReload,
statusStorageDir=status_path,
qrCallback=qrCallback,
exitCallback=self.exitCallback,
loginCallback=self.loginCallback
)
self.user_id = itchat.instance.storageClass.userName
self.name = itchat.instance.storageClass.nickName
logger.info("Wechat login success, user_id: {}, nickname: {}".format(self.user_id, self.name))
# start message listener
itchat.run()
except Exception as e:
logger.error(e)
def exitCallback(self):
try:
from common.linkai_client import chat_client
if chat_client.client_id and conf().get("use_linkai"):
_send_logout()
time.sleep(2)
self.auto_login_times += 1
if self.auto_login_times < 100:
chat_channel.handler_pool._shutdown = False
self.startup()
except Exception as e:
pass
def loginCallback(self):
logger.debug("Login success")
_send_login_success()
# handle_* 系列函数处理收到的消息后构造Context,然后传入produce函数中处理Context和发送回复
# Context包含了消息的所有信息,包括以下属性
# type 消息类型, 包括TEXT、VOICE、IMAGE_CREATE
# content 消息内容,如果是TEXT类型,content就是文本内容,如果是VOICE类型,content就是语音文件名,如果是IMAGE_CREATE类型,content就是图片生成命令
# kwargs 附加参数字典,包含以下的key:
# session_id: 会话id
# isgroup: 是否是群聊
# receiver: 需要回复的对象
# msg: ChatMessage消息对象
# origin_ctype: 原始消息类型,语音转文字后,私聊时如果匹配前缀失败,会根据初始消息是否是语音来放宽触发规则
# desire_rtype: 希望回复类型,默认是文本回复,设置为ReplyType.VOICE是语音回复
@time_checker
@_check
def handle_single(self, cmsg: ChatMessage):
# filter system message
if cmsg.other_user_id in ["weixin"]:
return
if cmsg.ctype == ContextType.VOICE:
if conf().get("speech_recognition") != True:
return
logger.debug("[WX]receive voice msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.IMAGE:
cmsg.prepare()
logger.debug("[WX]receive image msg: {},------{}".format(cmsg.content,cmsg._prepare_fn))
elif cmsg.ctype == ContextType.PATPAT:
logger.debug("[WX]receive patpat msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.TEXT:
logger.debug("[WX]receive text msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
else:
logger.debug("[WX]receive msg: {}, cmsg={}".format(cmsg.content, cmsg))
context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg)
if context:
self.produce(context)
@time_checker
@_check
def handle_group(self, cmsg: ChatMessage):
if cmsg.ctype == ContextType.VOICE:
if conf().get("group_speech_recognition") != True:
return
logger.debug("[WX]receive voice for group msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.IMAGE:
cmsg.prepare()
logger.debug("[WX]receive image for group msg: {}".format(cmsg.content))
elif cmsg.ctype in [ContextType.JOIN_GROUP, ContextType.PATPAT, ContextType.ACCEPT_FRIEND,
ContextType.EXIT_GROUP]:
logger.debug("[WX]receive note msg: {}".format(cmsg.content))
elif cmsg.ctype == ContextType.TEXT:
# logger.debug("[WX]receive group msg: {}, cmsg={}".format(json.dumps(cmsg._rawmsg, ensure_ascii=False), cmsg))
pass
elif cmsg.ctype == ContextType.FILE:
logger.debug(f"[WX]receive attachment msg, file_name={cmsg.content}")
else:
logger.debug("[WX]receive group msg: {}".format(cmsg.content))
context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=True, msg=cmsg)
if context:
self.produce(context)
# 统一的发送函数,每个Channel自行实现,根据reply的type字段发送不同类型的消息
def send(self, reply: Reply, context: Context):
receiver = context["receiver"]
if reply.type == ReplyType.TEXT:
time.sleep(0)
# def send(self, reply: Reply, context: Context):
# receiver = context["receiver"]
# if reply.type == ReplyType.TEXT:
# 分割标点符号,包括特殊的 '\\n'
split_punctuation = ['//n']
# 创建一个正则表达式模式,用来分割消息,确保正确处理 '||<'
pattern = '|'.join(map(lambda x: re.escape(x), split_punctuation))
# if len([con["content"] for con in reply.session_ago if con['role']=="assistant"])>1:
if reply.session_ago and len([con["content"] for con in reply.session_ago if con['role']=="assistant"]) > 1:
old_messages=[ str(con["content"]).replace('\n','').replace('\\n','') for con in reply.session_ago if con['role']=="assistant"][:-1]
old_str_data=" ".join(old_messages)
if old_str_data in str(reply.content).replace('\n','').replace('\\n',''):
logger.info(f'~~~~~~~~~~~~~~~replace:{old_str_data}')
logger.info(f'*******************ago*****************{reply.content}')
reply.content=str(reply.content).replace('\n','').replace('\\n','').replace(old_str_data,'')
logger.info(f'*******************now*****************{reply.content}')
# 使用正则表达式来分割消息
split_messages = re.split(pattern, reply.content)
logger.info(f'zzzzzzzzzzzzzzzz{reply.session_ago}')
# 移除空行
split_messages = [msg.strip() for msg in split_messages if msg.strip() != '']
for msg in split_messages:
is_send=True
pd_re_img=re.findall('(http.*?.(png|jpg|gif))',str(msg))
if "![Image]" in str(msg):
img_url_list=re.findall('\[Image\]\((.*?)\)',str(msg))
if img_url_list:
is_send=False
# itchat.send(msg, toUserName=receiver)
try:
for img_url in img_url_list:
logger.debug(f"[WX] start download image, img_url={img_url}")
pic_res = requests.get(str(img_url).replace('(', '').replace(')', ''),
stream=True)
image_storage = io.BytesIO()
size = 0
for block in pic_res.iter_content(1024):
size += len(block)
image_storage.write(block)
logger.info(f"[WX] download image success, size={size}, img_url={img_url}")
image_storage.seek(0)
itchat.send_image(image_storage, toUserName=receiver)
logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))
image_storage.close()
except Exception as e:
logger.error(e)
if not is_send:
# 发送消息
itchat.send(msg, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))
else:
itchat.send(msg, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))
elif "你可以点击[这里]" in str(msg) :
try:
img_url_list = re.findall('\((.*?)\)', str(msg))
if img_url_list:
is_send = False
# itchat.send(msg, toUserName=receiver)
for img_url in img_url_list:
logger.debug(f"[WX] start download image, img_url={img_url}")
pic_res = requests.get(str(img_url).replace('(', '').replace(')', ''),
stream=True)
image_storage = io.BytesIO()
size = 0
for block in pic_res.iter_content(1024):
size += len(block)
image_storage.write(block)
logger.info(f"[WX] download image success, size={size}, img_url={img_url}")
image_storage.seek(0)
itchat.send_image(image_storage, toUserName=receiver)
logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))
image_storage.close()
else:
itchat.send(msg, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))
except Exception as e:
logger.info(f"图片转换错误:{e}")
if not is_send:
# 发送消息
itchat.send(msg, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))
elif pd_re_img:
try:
is_send = False
# itchat.send(msg, toUserName=receiver)
for pd_re in pd_re_img:
if pd_re:
logger.debug(f"[WX] start download image, img_url={pd_re[0]}")
pic_res = requests.get(str(pd_re[0]).replace('(','').replace(')',''), stream=True)
image_storage = io.BytesIO()
size = 0
for block in pic_res.iter_content(1024):
size += len(block)
image_storage.write(block)
logger.info(f"[WX] download image success, size={size}, img_url={pd_re[0]}")
image_storage.seek(0)
itchat.send_image(image_storage, toUserName=receiver)
logger.info("[WX] sendImage url={}, receiver={}".format(pd_re[0], receiver))
image_storage.close()
except Exception as e:
logger.info(f"图片转换错误:{e}")
if not is_send:
# 发送消息
itchat.send(msg, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))
else:
# 发送消息
itchat.send(msg, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(msg, receiver))
# 等待随机时间
r_time = random.uniform(1, 2)
time.sleep(r_time)
# itchat.send(reply.content, toUserName=receiver)
# logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
elif reply.type == ReplyType.ERROR or reply.type == ReplyType.INFO:
itchat.send(reply.content, toUserName=receiver)
logger.info("[WX] sendMsg={}, receiver={}".format(reply, receiver))
elif reply.type == ReplyType.VOICE:
itchat.send_file(reply.content, toUserName=receiver)
logger.info("[WX] sendFile={}, receiver={}".format(reply.content, receiver))
elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片
img_url = reply.content
logger.debug(f"[WX] start download image, img_url={img_url}")
pic_res = requests.get(img_url, stream=True)
image_storage = io.BytesIO()
size = 0
for block in pic_res.iter_content(1024):
size += len(block)
image_storage.write(block)
logger.info(f"[WX] download image success, size={size}, img_url={img_url}")
image_storage.seek(0)
itchat.send_image(image_storage, toUserName=receiver)
logger.info("[WX] sendImage url={}, receiver={}".format(img_url, receiver))
elif reply.type == ReplyType.IMAGE: # 从文件读取图片
image_storage = reply.content
itchat.send_image(image_storage, toUserName=receiver)
logger.info("[WX] sendImage, receiver={}".format(receiver))
elif reply.type == ReplyType.FILE: # 新增文件回复类型
file_storage = reply.content
itchat.send_file(file_storage, toUserName=receiver)
logger.info("[WX] sendFile, receiver={}".format(receiver))
elif reply.type == ReplyType.VIDEO: # 新增视频回复类型
video_storage = reply.content
itchat.send_video(video_storage, toUserName=receiver)
logger.info("[WX] sendFile, receiver={}".format(receiver))
elif reply.type == ReplyType.VIDEO_URL: # 新增视频URL回复类型
video_url = reply.content
logger.debug(f"[WX] start download video, video_url={video_url}")
video_res = requests.get(video_url, stream=True)
video_storage = io.BytesIO()
size = 0
for block in video_res.iter_content(1024):
size += len(block)
video_storage.write(block)
logger.info(f"[WX] download video success, size={size}, video_url={video_url}")
video_storage.seek(0)
itchat.send_video(video_storage, toUserName=receiver)
logger.info("[WX] sendVideo url={}, receiver={}".format(video_url, receiver))
def _send_login_success():
try:
from common.linkai_client import chat_client
if chat_client.client_id:
chat_client.send_login_success()
except Exception as e:
pass
def _send_logout():
try:
from common.linkai_client import chat_client
if chat_client.client_id:
chat_client.send_logout()
except Exception as e:
pass
def _send_qr_code(qrcode_list: list):
try:
from common.linkai_client import chat_client
if chat_client.client_id:
chat_client.send_qrcode(qrcode_list)
except Exception as e:
pass
第二步
来到/root/chatgpt-on-wechat/bot/bytedance这个路径下(如果你找不到这个路径,去共学文档中,去补课Coze API 接入教程)
打开:bytedance_coze_bot.py 文件,将以下代码,全部粘贴进入。
注意,下方代码中,标红部分,就是你需要自己找的接口。
# encoding:utf-8
import time
from typing import List, Tuple
import os
import requests
from requests import Response
from bot.bot import Bot
from bot.chatgpt.chat_gpt_session import ChatGPTSession
from bot.session_manager import SessionManager
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from common.log import logger
from config import conf
class ByteDanceCozeBot(Bot):
def __init__(self):
super().__init__()
self.sessions = SessionManager(ChatGPTSession, model=conf().get("model") or "coze")
def reply(self, query, context=None):
# acquire reply content
if context.type == ContextType.TEXT:
logger.info("[COZE] query={}".format(query))
img_content = None
if str(query).endswith(".png"):
image_path = f"{os.getcwd()}/{query}"
with open(image_path, "rb") as f:
img_content = f.read()
res = requests.post("图片转换服务的地址",
files={'file': ('image.jpg', img_content)})
if not res:
return "图片上传失败"
img_on_url = res.text
logger.info(f'转化后的图片地址:{img_on_url}')
query = f"{img_on_url}"
try:
os.remove(image_path)
except:
pass
session_id = context["session_id"]
session = self.sessions.session_query(query, session_id)
logger.debug("[COZE] session query={}".format(session.messages))
reply_content, err = self._reply_text(session_id, session)
if err is not None:
logger.error("[COZE] reply error={}".format(err))
return Reply(ReplyType.ERROR, "我暂时遇到了一些问题,请您稍后重试~")
logger.debug(
"[COZE] new_query={}, session_id={}, reply_cont={}, completion_tokens={}".format(
session.messages,
session_id,
reply_content["content"],
reply_content["completion_tokens"],
)
)
self.sessions.session_reply(reply_content["content"], session_id,
reply_content["total_tokens"])
return Reply(ReplyType.TEXT, reply_content["content"])
else:
reply = Reply(ReplyType.ERROR, "Bot不支持处理{}类型的消息".format(context.type))
return reply
def _get_api_base_url(self):
return conf().get("coze_api_base", "https://api.coze.cn/open_api/v2")
def _get_headers(self):
return {
'Authorization': f"Bearer {conf().get('coze_api_key', '')}"
}
def _get_payload(self, user: str, query: str, chat_history: List[dict]):
return {
'bot_id': conf().get('coze_bot_id'),
"user": user,
"query": query,
"chat_history": chat_history,
"stream": False
}
def _reply_text(self, session_id: str, session: ChatGPTSession, retry_count=0):
try:
query, chat_history = self._convert_messages_format(session.messages)
base_url = self._get_api_base_url()
chat_url = f'{base_url}/chat'
headers = self._get_headers()
payload = self._get_payload(session.session_id, query, chat_history)
response = requests.post(chat_url, headers=headers, json=payload)
if response.status_code != 200:
error_info = f"[COZE] response text={response.text} status_code={response.status_code}"
logger.warn(error_info)
return None, error_info
answer, err = self._get_completion_content(response)
if err is not None:
return None, err
completion_tokens, total_tokens = self._calc_tokens(session.messages, answer)
return {
"total_tokens": total_tokens,
"completion_tokens": completion_tokens,
"content": answer
}, None
except Exception as e:
if retry_count < 2:
time.sleep(3)
logger.warn(f"[COZE] Exception: {repr(e)} 第{retry_count + 1}次重试")
return self._reply_text(session_id, session, retry_count + 1)
else:
return None, f"[COZE] Exception: {repr(e)} 超过最大重试次数"
def _convert_messages_format(self, messages) -> Tuple[str, List[dict]]:
# [
# {"role":"user","content":"你好","content_type":"text"},
# {"role":"assistant","type":"answer","content":"你好,请问有什么可以帮助你的吗?","content_type":"text"}
# ]
chat_history = []
for message in messages:
role = message.get('role')
if role == 'user':
content = message.get('content')
chat_history.append({"role": "user", "content": content, "content_type": "text"})
elif role == 'assistant':
content = message.get('content')
chat_history.append({"role": "assistant", "type": "answer", "content": content, "content_type": "text"})
elif role == 'system':
# TODO: deal system message
pass
user_message = chat_history.pop()
if user_message.get('role') != 'user' or user_message.get('content', '') == '':
raise Exception('no user message')
query = user_message.get('content')
logger.debug("[COZE] converted coze messages: {}".format([item for item in chat_history]))
logger.debug("[COZE] user content as query: {}".format(query))
return query, chat_history
def _get_completion_content(self, response: Response):
json_response = response.json()
if json_response['msg'] != 'success':
return None, f"[COZE] Error: {json_response['msg']}"
answer = None
for message in json_response['messages']:
if message.get('type') == 'answer':
answer = message.get('content')
break
if not answer:
return None, "[COZE] Error: empty answer"
return answer, None
def _calc_tokens(self, messages, answer):
# 简单统计token
completion_tokens = len(answer)
prompt_tokens = 0
for message in messages:
prompt_tokens += len(message["content"])
return completion_tokens, prompt_tokens + completion_tokens
以上就是功能的实现代码。完整配置后,就会发现,当你给机器人发一个图片时,他会转换成一个图片地址,然后发送给LLM去使用。以此达到“眼睛”的效果。
记得在你的Coze中,也要加上你想要对图片做处理的“工作流”或“图片理解”插件。
最后,重启机器人即可完成。
( 看到这里不来个关注嘛↓)
注意:因为是自建服务,我不能保证让你永久使用,只能先保证至少3个月内,你可以无限制使用图片功能。
如果用的人少,我有可能会在3个月后下掉服务!!! 如果用的人少,有可能会在3个月后下掉服务!!! 如果用的人少,有可能会在3个月后下掉服务!!!
