模型上下文协议MCP与Ollama的整合实现指南
在过去一两个个月里,模型上下文协议(Model Context Protocol,MCP)频繁出现在各种技术微信交流群中。我们已经看到了许多很酷的集成案例,大家似乎相信这个标准会长期存在,因为它为大模型与工具或软件的集成设立了规范。
前面一篇文章给大家分享了MCP一些基础概念,但是读完之后还是模棱两可,所以决定尝试将Ollama中的小型语言模型与MCP服务器连接起来,体验一下这个新标准。今天,向大家展示如何实现Ollama与MCP服务器的集成。
实现步骤
整个集成的主要步骤包括:
- 创建测试以及使用MCP服务
- 创建客户端文件来发送请求并启动服务
- 从服务获取工具到客户端
- 将工具转换为pydantic模型
- 通过response format将工具(作为pydantic模型)传递给Ollama
- 通过Ollama发送对话并接收结构化输出
- 如果响应中包含工具,则向服务器发送请求
安装依赖
要运行这个项目,需要安装必要的包。fastmcp库在使用uv运行代码时效果最佳。uv很容易下载和使用,类似于Poetry和pip。
使用以下命令将所需库添加到你的项目中:
uv add fastmcp ollama
这会同时安装MCP服务器和Ollama聊天库,以便你在它们的基础上构建客户端和服务器逻辑。
文件结构
设置时,你的文件夹应该是这样的:
your folder
├── server.py
└── client.py
server.py
文件包含MCP服务器和想要暴露的工具。client.py
文件在后台进程中启动服务器,获取可用工具,并与Ollama连接。
示例MCP服务器
首先,让我们使用fastmcp库创建一个简单的MCP服务器。该服务器暴露了一个名为magicoutput
的工具。这个函数接受两个字符串输入并返回一个固定的字符串作为输出。
@mcp.tool()
装饰器用于将函数注册为MCP服务器中的可用工具。当服务器启动后,任何客户端都可以获取并调用这个工具。
通过在主块中调用mcp.run()
来启动服务器。
# server.py
from fastmcp import FastMCP
# 创建MCP服务器
mcp = FastMCP("TestServer")
# 我的工具:
@mcp.tool()
def magicoutput(obj1: str, obj2: str) -> int:
"""使用此函数获取魔法输出"""
print(f"输入参数:obj1:{obj1},obj2:{obj2}")
return f"输入参数:obj1:{obj1},obj2:{obj2},魔法输出:Hello MCP,MCP Hello"
if \_\_name\_\_ == "\_\_main\_\_":
mcp.run()
我们运行下面命令,进行调试服务端的工具:
fastmcp dev server.py
输入日志如下:
Need to install the following packages:
@modelcontextprotocol/inspector@0.10.2
Ok to proceed? (y) y
Starting MCP inspector...
⚙️ Proxy server listening on port 6277
🔍 MCP Inspector is up and running at http://127.0.0.1:6274 🚀
New SSE connection
Query parameters: [Object:
本地访问页面http://127.0.0.1:6274/#tools
,我们可以看构造的函数,并且可以调试
获取服务器工具
为了连接到MCP服务器并列出可用工具,我们使用来自mcp库的ClientSession
、StdioServerParameters
和stdio\_client
。
我们定义一个名为OllamaMCP
的类来处理服务器连接和工具获取。在类内部,\_async\_run
方法启动异步会话,初始化它,并从服务器获取工具列表。
我们使用threading.Event()
来跟踪会话何时准备就绪,并将工具列表存储在self.tools
中。
在脚本末尾,我们定义服务器参数并在后台线程中运行客户端。这会启动连接并打印服务器返回的工具元数据。
# client.py
import asyncio
import threading
from pathlib import Path
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio\_client
from typing import Any
class OllamaMCP:
"""
Ollama和FastMCP的简单集成
"""
def \_\_init\_\_(self, server\_params: StdioServerParameters):
self.server\_params = server\_params
self.initialized = threading.Event()
self.tools: list[Any] = []
def \_run\_background(self):
asyncio.run(self.\_async\_run())
async def \_async\_run(self):
try:
async with stdio\_client(self.server\_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
self.session = session
tools\_result = await session.list\_tools()
self.tools = tools\_result.tools
print(tools\_result)
except Exception as e:
print(f"启动MCP服务器时出错 {str(e)}")
if \_\_name\_\_ == "\_\_main\_\_":
server\_parameters = StdioServerParameters(
command="uv",
args=["run", "python", "server.py"],
cwd=str(Path.cwd())
)
ollamamcp = OllamaMCP(server\_params=server\_parameters)
ollamamcp.\_run\_background()
运行上面的代码后,你会从服务器得到以下响应,其中可以看到服务器上可用的工具列表。
[04/19/25 12:10:47] INFO Starting server "TestServer"... server.py:261
meta=None nextCursor=None tools=[Tool(name='magicoutput', description='使用此函数获取魔法输出', inputSchema={'properties': {'obj1': {'title': 'Obj1', 'type': 'string'}, 'obj2': {'title': 'Obj2', 'type': 'string'}}, 'required': ['obj1', 'obj2'], 'title': 'magicoutputArguments', 'type': 'object'})]
将工具转换为pydantic模型
现在我们已经从服务器接收到了工具列表,下一步是将它们转换为Pydantic模型。我们使用Pydantic的create\_model
来动态定义新的响应模式,基于服务器的工具定义。还有一个辅助函数来将JSON类型映射到有效的Python类型。
Pydantic 是一个用于数据验证和序列化的 Python 模型库。它在 FastAPI 中广泛使用,用于定义请求体、响应体和其他数据模型,提供了强大的类型检查和自动文档生成功能
这帮助我们动态定义模型,使语言模型确切知道在返回工具参数时应使用什么结构。
# client.py
import asyncio
import threading
from pathlib import Path
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio\_client
from typing import Any, Union, Optional
from pydantic import BaseModel, create\_model, Field
class OllamaMCP:
"""Ollama和FastMCP的简单集成"""
def \_\_init\_\_(self, server\_params: StdioServerParameters):
self.server\_params = server\_params
self.initialized = threading.Event()
self.tools: list[Any] = []
def \_run\_background(self):
asyncio.run(self.\_async\_run())
async def \_async\_run(self):
try:
async with stdio\_client(self.server\_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
self.session = session
tools\_result = await session.list\_tools()
self.tools = tools\_result.tools
except Exception as e:
print(f"启动MCP服务器时出错 {str(e)}")
def create\_response\_model(self):
dynamic\_classes = {}
for tool in self.tools:
class\_name = tool.name.capitalize()
properties = {}
for prop\_name, prop\_info in tool.inputSchema.get("properties", {}).items():
json\_type = prop\_info.get("type", "string")
properties[prop\_name] = self.convert\_json\_type\_to\_python\_type(json\_type)
model = create\_model(
class\_name,
\_\_base\_\_=BaseModel,
\_\_doc\_\_=tool.description,
**properties,
)
dynamic\_classes[class\_name] = model
if dynamic\_classes:
all\_tools\_type = Union[tuple(dynamic\_classes.values())]
Response = create\_model(
"Response",
\_\_base\_\_=BaseModel,
\_\_doc\_\_="LLm响应类",
response=(str, Field(..., description= "向用户确认函数将被调用。")),
tool=(all\_tools\_type, Field(
...,
description="用于运行和获取魔法输出的工具"
)),
)
else:
Response = create\_model(
"Response",
\_\_base\_\_=BaseModel,
\_\_doc\_\_="LLm响应类",
response=(str, ...),
tool=(Optional[Any], Field(None, description="如果不返回None则使用的工具")),
)
self.response\_model = Response
print(Response.model\_fields)
@staticmethod
def convert\_json\_type\_to\_python\_type(json\_type: str):
"""简单地将JSON类型映射到Python(Pydantic)类型。"""
if json\_type == "integer":
return (int, ...)
if json\_type == "number":
return (float, ...)
if json\_type == "string":
return (str, ...)
if json\_type == "boolean":
return (bool, ...)
return (str, ...)
if \_\_name\_\_ == "\_\_main\_\_":
server\_parameters = StdioServerParameters(
command="uv",
args=["run", "python", "server.py"],
cwd=str(Path.cwd())
)
ollamamcp = OllamaMCP(server\_params=server\_parameters)
ollamamcp.\_run\_background()
ollamamcp.create\_response\_model()
运行代码后,print(Response.model\_fields)
的输出显示了我们刚刚构建的响应模型的完整结构。这个模型包括两部分:一部分是助手发送给用户的消息,另一部分是可选字段,保存工具参数。
如果模型填充了工具字段,我们将使用它来调用服务器。否则,我们只使用普通的响应字符串。
[04/19/25 12:17:20] INFO Starting server "TestServer"... server.py:261
{'response': FieldInfo(annotation=str, required=True, description='向用户确认函数将被调用。'), 'tool': FieldInfo(annotation=Magicoutput, required=True, description='用于运行和获取魔法输出的工具')}
使用后台线程和队列调用工具
现在工具可以作为pydantic模型使用了,我们可以继续并启用工具调用。为此,我们使用后台线程并设置两个队列。一个用于向服务器发送请求,另一个用于接收响应。
call\_tool
方法将请求放入队列,后台线程监听该请求。一旦使用MCP会话调用工具,结果就会放入响应队列。
import asyncio
import threading
import threading
import queue
from pathlib import Path
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio\_client
from typing import Any, Union, Optional
from pydantic import BaseModel, create\_model, Field
class OllamaMCP:
"""Ollama和FastMCP的简单集成"""
def \_\_init\_\_(self, server\_params: StdioServerParameters):
self.server\_params = server\_params
self.initialized = threading.Event()
self.tools: list[Any] = []
self.request\_queue = queue.Queue()
self.response\_queue = queue.Queue()
# 启动后台线程异步处理请求
self.thread = threading.Thread(target=self.\_run\_background, daemon=True)
self.thread.start()
def \_run\_background(self):
asyncio.run(self.\_async\_run())
async def \_async\_run(self):
try:
async with stdio\_client(self.server\_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
self.session = session
tools\_result = await session.list\_tools()
self.tools = tools\_result.tools
self.initialized.set()
while True:
try:
tool\_name, arguments = self.request\_queue.get(block=False)
except queue.Empty:
await asyncio.sleep(0.01)
continue
if tool\_name is None:
print("收到关闭信号。")
break
try:
result = await session.call\_tool(tool\_name, arguments)
self.response\_queue.put(result)
except Exception as e:
self.response\_queue.put(f"错误: {str(e)}")
except Exception as e:
print("MCP会话初始化错误:", str(e))
self.initialized.set() # 即使初始化失败也解除等待线程的阻塞
self.response\_queue.put(f"MCP初始化错误: {str(e)}")
def call\_tool(self, tool\_name: str, arguments: dict[str, Any]) -> Any:
"""
发布工具调用请求并等待结果
"""
if not self.initialized.wait(timeout=30):
raise TimeoutError("MCP会话未能及时初始化。")
self.request\_queue.put((tool\_name, arguments))
result = self.response\_queue.get()
return result
def shutdown(self):
"""
干净地关闭持久会话
"""
self.request\_queue.put((None, None))
self.thread.join()
print("持久MCP会话已关闭。")
def create\_response\_model(self):
dynamic\_classes = {}
for tool in self.tools:
class\_name = tool.name.capitalize()
properties = {}
for prop\_name, prop\_info in tool.inputSchema.get("properties", {}).items():
json\_type = prop\_info.get("type", "string")
properties[prop\_name] = self.convert\_json\_type\_to\_python\_type(json\_type)
model = create\_model(
class\_name,
\_\_base\_\_=BaseModel,
\_\_doc\_\_=tool.description,
**properties,
)
dynamic\_classes[class\_name] = model
if dynamic\_classes:
all\_tools\_type = Union[tuple(dynamic\_classes.values())]
Response = create\_model(
"Response",
\_\_base\_\_=BaseModel,
response=(str, ...),
tool=(Optional[all\_tools\_type], Field(None, description="如果不返回None则使用的工具")),
)
else:
Response = create\_model(
"Response",
\_\_base\_\_=BaseModel,
response=(str, ...),
tool=(Optional[Any], Field(None, description="如果不返回None则使用的工具")),
)
self.response\_model = Response
@staticmethod
def convert\_json\_type\_to\_python\_type(json\_type: str):
"""简单地将JSON类型映射到Python(Pydantic)类型。"""
if json\_type == "integer":
return (int, ...)
if json\_type == "number":
return (float, ...)
if json\_type == "string":
return (str, ...)
if json\_type == "boolean":
return (bool, ...)
return (str, ...)
if \_\_name\_\_ == "\_\_main\_\_":
server\_parameters = StdioServerParameters(
command="uv",
args=["run", "python", "server.py"],
cwd=str(Path.cwd())
)
ollamamcp = OllamaMCP(server\_params=server\_parameters)
if ollamamcp.initialized.wait(timeout=30):
print("准备调用工具。")
result = ollamamcp.call\_tool(
tool\_name="magicoutput",
arguments={"obj1": "dog", "obj2": "cat"}
)
print(result)
else:
print("错误: 初始化超时。")
请注意,在这个阶段,我们正在使用call\_tool
方法手动传递函数名称和参数。在下一节中,我们将根据Ollama返回的结构化输出触发这个调用。
运行此代码后,我们可以确认一切正常。工具被服务器正确识别、执行,并返回结果。
[04/19/25 12:18:26] INFO Starting server "TestServer"... server.py:261
准备调用工具。
meta=None content=[TextContent(type='text', text='输入参数:obj1:dog,obj2:cat,魔法输出:Hello MCP,MCP Hello', annotations=None)] isError=False
Ollama + MCP结合
首先我们先通过Ollama部署一个大模型服务,这里我们下gemma3
下面代码中,我是设置的局域网的ip
client = Client(
host='http://192.168.1.5:11434',
headers={'x-some-header': 'some-value'}
)
有了队列和call\_tool
函数,现在是时候该集成Ollama了。我们将响应类传递到Ollama的format字段中,告诉我们的语言模型(这里是Gemma)在生成输出时遵循该模式。
我们还定义了一个ollama\_chat
方法,用于发送对话,验证模型的响应是否符合模式,并检查是否包含工具。如果是,它提取函数名称和参数,然后使用在后台线程中的持久MCP会话调用它。
在main函数中,我们设置服务器连接,启动后台循环,并等待一切就绪。然后我们准备系统提示和用户消息,将它们发送给Ollama,并等待结构化输出。
最后,我们打印服务器的结果并关闭会话。
import asyncio
import threading
import queue
from pathlib import Path
from typing import Any, Optional, Union
from pydantic import BaseModel, Field, create\_model
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio\_client
from ollama import chat,Client
client = Client(
host='http://192.168.1.5:11434',
headers={'x-some-header': 'some-value'}
)
class OllamaMCP:
def \_\_init\_\_(self, server\_params: StdioServerParameters):
self.server\_params = server\_params
self.request\_queue = queue.Queue()
self.response\_queue = queue.Queue()
self.initialized = threading.Event()
self.tools: list[Any] = []
self.thread = threading.Thread(target=self.\_run\_background, daemon=True)
self.thread.start()
def \_run\_background(self):
asyncio.run(self.\_async\_run())
async def \_async\_run(self):
try:
async with stdio\_client(self.server\_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
self.session = session
tools\_result = await session.list\_tools()
self.tools = tools\_result.tools
self.initialized.set()
while True:
try:
tool\_name, arguments = self.request\_queue.get(block=False)
except queue.Empty:
await asyncio.sleep(0.01)
continue
if tool\_name is None:
break
try:
result = await session.call\_tool(tool\_name, arguments)
self.response\_queue.put(result)
except Exception as e:
self.response\_queue.put(f"错误: {str(e)}")
except Exception as e:
print("MCP会话初始化错误:", str(e))
self.initialized.set() # 即使初始化失败也解除等待线程的阻塞
self.response\_queue.put(f"MCP初始化错误: {str(e)}")
def call\_tool(self, tool\_name: str, arguments: dict[str, Any]) -> Any:
"""
发布工具调用请求并等待结果
"""
if not self.initialized.wait(timeout=30):
raise TimeoutError("MCP会话未能及时初始化。")
self.request\_queue.put((tool\_name, arguments))
result = self.response\_queue.get()
return result
def shutdown(self):
"""
干净地关闭持久会话
"""
self.request\_queue.put((None, None))
self.thread.join()
print("持久MCP会话已关闭。")
@staticmethod
def convert\_json\_type\_to\_python\_type(json\_type: str):
"""简单地将JSON类型映射到Python(Pydantic)类型。"""
if json\_type == "integer":
return (int, ...)
if json\_type == "number":
return (float, ...)
if json\_type == "string":
return (str, ...)
if json\_type == "boolean":
return (bool, ...)
return (str, ...)
def create\_response\_model(self):
"""
基于获取的工具创建动态Pydantic响应模型
"""
dynamic\_classes = {}
for tool in self.tools:
class\_name = tool.name.capitalize()
properties: dict[str, Any] = {}
for prop\_name, prop\_info in tool.inputSchema.get("properties", {}).items():
json\_type = prop\_info.get("type", "string")
properties[prop\_name] = self.convert\_json\_type\_to\_python\_type(json\_type)
model = create\_model(
class\_name,
\_\_base\_\_=BaseModel,
\_\_doc\_\_=tool.description,
**properties,
)
dynamic\_classes[class\_name] = model
if dynamic\_classes:
all\_tools\_type = Union[tuple(dynamic\_classes.values())]
Response = create\_model(
"Response",
\_\_base\_\_=BaseModel,
\_\_doc\_\_="LLm响应类",
response=(str, Field(..., description= "向用户确认函数将被调用。")),
tool=(all\_tools\_type, Field(
...,
description="用于运行和获取魔法输出的工具"
)),
)
else:
Response = create\_model(
"Response",
\_\_base\_\_=BaseModel,
\_\_doc\_\_="LLm响应类",
response=(str, ...),
tool=(Optional[Any], Field(None, description="如果不返回None则使用的工具")),
)
self.response\_model = Response
async def ollama\_chat(self, messages: list[dict[str, str]]) -> Any:
"""
使用动态响应模型向Ollama发送消息。
如果在响应中检测到工具,则使用持久会话调用它。
"""
conversation = [{"role":"assistant", "content": f"你必须使用工具。你可以使用以下函数:{[ tool.name for tool in self.tools]}"}]
conversation.extend(messages)
if self.response\_model is None:
raise ValueError("响应模型尚未创建。请先调用create\_response\_model()。")
# 获取聊天消息格式的JSON模式
format\_schema = self.response\_model.model\_json\_schema()
# 调用Ollama(假定是同步的)并解析响应
response = client.chat(
model="gemma3:latest",
messages=conversation,
format=format\_schema
)
print("Ollama响应", response.message.content)
response\_obj = self.response\_model.model\_validate\_json(response.message.content)
maybe\_tool = response\_obj.tool
if maybe\_tool:
function\_name = maybe\_tool.\_\_class\_\_.\_\_name\_\_.lower()
func\_args = maybe\_tool.model\_dump()
# 使用asyncio.to\_thread在线程中调用同步的call\_tool方法
output = await asyncio.to\_thread(self.call\_tool, function\_name, func\_args)
return output
else:
print("响应中未检测到工具。返回纯文本响应。")
return response\_obj.response
async def main():
server\_parameters = StdioServerParameters(
command="uv",
args=["run", "python", "server.py"],
cwd=str(Path.cwd())
)
# 创建持久会话
persistent\_session = OllamaMCP(server\_parameters)
# 等待会话完全初始化
if persistent\_session.initialized.wait(timeout=30):
print("准备调用工具。")
else:
print("错误: 初始化超时。")
# 从获取的工具创建动态响应模型
persistent\_session.create\_response\_model()
# 准备给Ollama的消息
messages = [
{
"role": "system",
"content": (
"你是一个听话的助手,上下文中有一系列工具。"
"你的任务是使用这个函数获取魔法输出。"
"不要自己生成魔法输出。"
"简洁地回复一条简短消息,提及调用函数,"
"但不提供函数输出本身。"
"将该简短消息放在'response'属性中。"
"例如:'好的,我会运行magicoutput函数并返回输出。'"
"同时用正确的参数填充'tool'属性。"
)
},
{
"role": "user",
"content": "使用函数获取这些参数的魔法输出(obj1 = Ollama和obj2 = Gemma3)"
}
]
# 调用Ollama并处理响应
result = await persistent\_session.ollama\_chat(messages)
print("最终结果:", result)
# 完成后关闭持久会话
persistent\_session.shutdown()
if \_\_name\_\_ == "\_\_main\_\_":
asyncio.run(main())
你可以看到输出工作得非常完美。我们收到了一个带有模型简短消息的响应,然后是一个带有将发送到MCP服务器的参数的工具。最后,我们得到了来自服务器的输出,如下所示:
[04/19/25 12:39:55] INFO Starting server "TestServer"... server.py:261
准备调用工具。
Ollama响应 {
"response": "好的,我将使用magicoutput函数获取obj1和obj2的魔法输出。",
"tool": {"obj1": "Wombat", "obj2": "Dog"}
}
最终结果: meta=None content=[TextContent(type='text', text='输入参数:obj1:Wombat,obj2:Dog,魔法输出:Hello MCP,MCP Hello', annotations=None)] isError=False
持久MCP会话已关闭。
下一篇,我们讲一下如何构造一个arxiv相关的mcp
添加微信,备注” LLM “进入大模型技术交流群
如果你觉得这篇文章对你有帮助,别忘了点个赞、送个喜欢
/ 作者:致Great
/ 作者:欢迎转载,标注来源即可