
| import asyncio import os from typing import Optional from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client
from anthropic import Anthropic from dotenv import load_dotenv from openai import OpenAI
class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.client = OpenAI( api_key=os.getenv("OPENAI_API_KEY", ""), base_url=os.getenv("OPENAI_BASE_URL", "https://api.deepseek.com") ) self.model = "deepseek-chat"
async def connect_to_server(self, server_script_path: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith('.py') is_js = server_script_path.endswith('.js') if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) await self.session.initialize() response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str: """处理用户输入,调用模型 + 工具""" messages = [ {"role": "user", "content": query}, {"role": "system", "content": "你是一位优秀的职业交易员, 擅长通过价格行为对市场进行分析, 属于波段交易者"} ] response = await self.session.list_tools() available_tools = [ { "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in response.tools ]
final_text = [] tool_results = []
completion = self.client.chat.completions.create( model=self.model, messages=messages, tools=available_tools, tool_choice="auto", )
while completion.choices[0].finish_reason == "tool_calls": choice = completion.choices[0] tool_call = choice.message.tool_calls[0] tool_name = tool_call.function.name tool_args = eval(tool_call.function.arguments)
tool_result = await self.session.call_tool(tool_name, tool_args) tool_results.append({"tool": tool_name, "result": tool_result})
messages.append({ "role": "assistant", "tool_calls": [tool_call] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(tool_result.content) })
completion = self.client.chat.completions.create( model=self.model, messages=messages, tools=available_tools, tool_choice="auto", )
final_text.append(completion.choices[0].message.content) return "\n".join(final_text)
async def chat_loop(self): """Run an interactive chat loop""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == 'quit': break response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\nError: {str(e)}") async def cleanup(self): """Clean up resources""" await self.exit_stack.aclose()
async def main(): if len(sys.argv) < 2: print("Usage: python client.py <path_to_server_script>") sys.exit(1) client = MCPClient() try: await client.connect_to_server(sys.argv[1]) await client.chat_loop() finally: await client.cleanup()
if __name__ == "__main__": import sys asyncio.run(main())
|