两个文件,对应一个 MCP server 和一个调用者(需要第三方依赖 httpx),用于展示 MCP 的基本原理。

mcp-server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import sys
import json
import subprocess

# --- 安全白名单配置 ---
# 仅允许执行以下键对应的命令
SAFE_WHITELIST = {"list_files": "dir", "who_am_i": "whoami"}


def send_response(response):
"""按照 JSON-RPC 规范发送响应至标准输出"""
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()


def main():
"""MCP Server 核心循环"""
# 强制以行为单位读取,确保与 stdio 协议同步
while True:
line = sys.stdin.readline()
if not line:
break

try:
request = json.loads(line)
req_id = request.get("id")
method = request.get("method")
params = request.get("params", {})

# 1. 处理初始化请求 (Initialization)
if method == "initialize":
send_response(
{
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": "raw-white-server",
"version": "1.0.0",
},
},
}
)

# 2. 响应列出工具请求 (List Tools)
elif method == "tools/list":
send_response(
{
"jsonrpc": "2.0",
"id": req_id,
"result": {
"tools": [
{
"name": "execute_white_command",
"description": "执行白名单内的安全系统指令",
"inputSchema": {
"type": "object",
"properties": {
"action_key": {
"type": "string",
"enum": list(SAFE_WHITELIST.keys()),
"description": "白名单指令键",
}
},
"required": ["action_key"],
},
}
]
},
}
)

# 3. 执行工具调用 (Call Tool)
elif method == "tools/call":
# 从嵌套的 arguments 中提取键名
args = params.get("arguments", {})
action_key = args.get("action_key")

# 匹配白名单命令
target_cmd = SAFE_WHITELIST.get(action_key)
# print(f"[*] Agent 正在执行白名单指令: {target_cmd}")
if target_cmd:
try:
# 执行真实系统指令。注意:此处已修正变量引用
output = subprocess.check_output(
f"{target_cmd}",
shell=True,
stderr=subprocess.STDOUT,
text=True,
)
content = [{"type": "text", "text": output}]
except subprocess.CalledProcessError as e:
content = [
{"type": "text", "text": f"Execution Error: {e.output}"}
]
except Exception as e:
content = [{"type": "text", "text": f"System Error: {str(e)}"}]
else:
content = [
{
"type": "text",
"text": f"Access Denied: '{action_key}' is not in whitelist.",
}
]

send_response(
{"jsonrpc": "2.0", "id": req_id, "result": {"content": content}}
)

# 4. 处理初始化完成通知
elif method == "notifications/initialized":
pass

except json.JSONDecodeError:
# 忽略非 JSON 格式的异常输入
continue
except Exception as e:
# 将错误定向到 stderr,避免污染 stdout 报文流
sys.stderr.write(f"Unexpected error: {str(e)}\n")
sys.stderr.flush()


if __name__ == "__main__":
main()
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import json
import subprocess
import httpx

API_KEY = os.getenv("OPENROUTER_API_KEY")
API_URL = "https://openrouter.ai/api/v1/chat/completions"
CURRENT_MODEL = "openrouter/free"


def call_api(messages, tools=None):
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}

payload = {
"model": CURRENT_MODEL,
"messages": messages,
}
if tools:
payload["tools"] = tools

response = httpx.post(API_URL, headers=headers, json=payload, timeout=60.0)
response.raise_for_status()
return response.json()


# MCP server
server_process = subprocess.Popen(
["python", "mcp-server.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
)


def call_mcp_server(method, params, req_id):
req = {"jsonrpc": "2.0", "id": req_id, "method": method, "params": params}
server_process.stdin.write(json.dumps(req) + "\n") # type: ignore
server_process.stdin.flush() # type: ignore

raw_line = server_process.stdout.readline() # type: ignore
if not raw_line:
error_info = server_process.stderr.read() # type: ignore
print(f"[ERROR] MCP Server terminated: {error_info}")
return {}

try:
return json.loads(raw_line)
except json.JSONDecodeError:
print(f"[ERROR] Invalid JSON from Server: {raw_line}")
return {}


call_mcp_server("initialize", {"protocolVersion": "2024-11-05"}, 1)

mcp_response = call_mcp_server("tools/list", {}, 2)
raw_tools = mcp_response.get("result", {}).get("tools", [])

llm_tools = [
{
"type": "function",
"function": {
"name": t["name"],
"description": t["description"],
"parameters": t["inputSchema"],
},
}
for t in raw_tools
]

messages = [
{"role": "system", "content": "你是一个高效的运维助手。"},
{"role": "user", "content": "帮我看看当前目录下有什么文件?"},
]

llm_res_json = call_api(messages, tools=llm_tools)
assistant_msg = llm_res_json["choices"][0]["message"]
messages.append(assistant_msg)

print("\n--- Model Output ---")
print(json.dumps(assistant_msg, indent=2, ensure_ascii=False))

# Tool Calling
if "tool_calls" in assistant_msg and assistant_msg["tool_calls"]:
for tool_call in assistant_msg["tool_calls"]:
call_id = tool_call["id"]
fn_name = tool_call["function"]["name"]
fn_args = json.loads(tool_call["function"]["arguments"])

print(f"[*] Executing Tool: {fn_name} with args: {fn_args}")

mcp_exec_res = call_mcp_server(
"tools/call", {"name": fn_name, "arguments": fn_args}, 4
)

content_list = mcp_exec_res.get("result", {}).get("content", [])
observation = content_list[0].get("text", "") if content_list else "No output"

messages.append(
{
"role": "tool",
"tool_call_id": call_id,
"name": fn_name,
"content": observation,
}
)

final_res_json = call_api(messages)
final_text = final_res_json["choices"][0]["message"]["content"]
print("\n--- Final Results ---")
print(final_text)

server_process.terminate()