1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
| import asyncio import os from typing import Optional from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client
from anthropic import Anthropic from dotenv import load_dotenv from openai import OpenAI
class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.client = OpenAI( api_key=os.getenv("OPENAI_API_KEY", ""), base_url=os.getenv("OPENAI_BASE_URL", "https://api.deepseek.com") ) self.model = "deepseek-chat"
async def connect_to_server(self, server_script_path: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith('.py') is_js = server_script_path.endswith('.js') if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) await self.session.initialize() response = await self.session.list_tools() tools = response.tools print("\nConnected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str: """处理用户输入,调用模型 + 工具""" messages = [ {"role": "user", "content": query}, {"role": "system", "content": "你是一位优秀的职业交易员, 擅长通过价格行为对市场进行分析, 属于波段交易者"} ] response = await self.session.list_tools() available_tools = [ { "type": "function", "function": { "name": tool.name, "description": tool.description, "parameters": tool.inputSchema } } for tool in response.tools ]
final_text = [] tool_results = []
completion = self.client.chat.completions.create( model=self.model, messages=messages, tools=available_tools, tool_choice="auto", )
while completion.choices[0].finish_reason == "tool_calls": choice = completion.choices[0] tool_call = choice.message.tool_calls[0] tool_name = tool_call.function.name tool_args = eval(tool_call.function.arguments)
tool_result = await self.session.call_tool(tool_name, tool_args) tool_results.append({"tool": tool_name, "result": tool_result})
messages.append({ "role": "assistant", "tool_calls": [tool_call] }) messages.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(tool_result.content) })
completion = self.client.chat.completions.create( model=self.model, messages=messages, tools=available_tools, tool_choice="auto", )
final_text.append(completion.choices[0].message.content) return "\n".join(final_text)
async def chat_loop(self): """Run an interactive chat loop""" print("\nMCP Client Started!") print("Type your queries or 'quit' to exit.") while True: try: query = input("\nQuery: ").strip() if query.lower() == 'quit': break response = await self.process_query(query) print("\n" + response) except Exception as e: print(f"\nError: {str(e)}") async def cleanup(self): """Clean up resources""" await self.exit_stack.aclose()
async def main(): if len(sys.argv) < 2: print("Usage: python client.py <path_to_server_script>") sys.exit(1) client = MCPClient() try: await client.connect_to_server(sys.argv[1]) await client.chat_loop() finally: await client.cleanup()
if __name__ == "__main__": import sys asyncio.run(main())
|