Initial commit: SSH-MCP server implementation

- SSH-MCP server with 7 infrastructure management tools
  * ssh_list_hosts - List all available hosts
  * ssh_exec - Execute commands on remote hosts
  * ssh_get_file / ssh_put_file - File operations
  * ssh_docker_ps - List Docker containers
  * lxc_list / lxc_exec - LXC container management

- HTTP/SSE transport wrapper for Claude Code integration
  * FastAPI-based HTTP server on port 8081
  * Server-Sent Events (SSE) support
  * MCP 2025 specification compliant

- Complete deployment on LXC 110 (10.50.0.110)
  * SSH key-based authentication (Ed25519)
  * Systemd service for automatic startup
  * Tested and verified on all infrastructure

- Comprehensive documentation
  * Complete deployment guide
  * Git collaboration workflow
  * API usage examples

Developed collaboratively by Claude Code and Agent Zero.

Infrastructure Access:
- photon.obnh.io (test target)
- proton.obr.sh (development server, hosts this repo)
- fry.obr.sh (migration target)
- Proxmox 10.50.0.72 (LXC/VM host)

🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude Code
2025-11-13 13:31:07 +00:00
commit 7b98651e5a
5 changed files with 1321 additions and 0 deletions

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
HTTP/SSE Transport Wrapper for SSH-MCP Server
This provides an HTTP interface with Server-Sent Events (SSE) support
for Claude Code to connect to the SSH-MCP server.
"""
import asyncio
import json
import subprocess
from typing import Optional
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import StreamingResponse
import uvicorn
app = FastAPI(title="SSH-MCP Server", version="1.0.0")
# In-memory storage for MCP server process
mcp_process: Optional[asyncio.subprocess.Process] = None
async def start_mcp_server():
"""Start the MCP server subprocess"""
global mcp_process
if mcp_process is None:
mcp_process = await asyncio.create_subprocess_exec(
"/usr/bin/python3",
"/opt/ssh-mcp-server/server.py",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
return mcp_process
async def send_mcp_request(request_data: dict) -> dict:
"""Send request to MCP server and get response"""
process = await start_mcp_server()
# Send request
request_json = json.dumps(request_data) + "\n"
process.stdin.write(request_json.encode())
await process.stdin.drain()
# Read response
response_line = await process.stdout.readline()
response_data = json.loads(response_line.decode())
return response_data
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"name": "SSH-MCP Server",
"version": "1.0.0",
"status": "operational",
"description": "MCP server providing SSH-based infrastructure access",
"endpoints": {
"/": "Health check",
"/tools": "List available tools",
"/tools/call": "Call a tool",
"/sse": "Server-Sent Events endpoint"
}
}
@app.get("/health")
async def health():
"""Health check"""
return {"status": "healthy"}
@app.post("/tools")
async def list_tools():
"""List available MCP tools"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}
response = await send_mcp_request(request)
return response.get("result", {})
@app.post("/tools/call")
async def call_tool(request: Request):
"""Call an MCP tool"""
body = await request.json()
tool_name = body.get("name")
arguments = body.get("arguments", {})
mcp_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": arguments
}
}
response = await send_mcp_request(mcp_request)
return response.get("result", {})
@app.get("/sse")
async def sse_endpoint(request: Request):
"""
Server-Sent Events endpoint for real-time MCP communication
This is required for Claude Code's MCP connector
"""
async def event_generator():
# Send initial connection event
yield f"data: {json.dumps({'type': 'connected', 'server': 'ssh-mcp-server'})}\n\n"
# Keep connection alive
while True:
if await request.is_disconnected():
break
# Send periodic heartbeat
yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': asyncio.get_event_loop().time()})}\n\n"
await asyncio.sleep(30)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
@app.post("/message")
async def message(request: Request):
"""
MCP message endpoint for Claude Code integration
Accepts JSON-RPC 2.0 messages
"""
body = await request.json()
response = await send_mcp_request(body)
return response
# MCP Server Info endpoint (required by MCP spec)
@app.get("/mcp/info")
async def mcp_info():
"""MCP server information"""
return {
"name": "ssh-mcp-server",
"version": "1.0.0",
"description": "SSH-based MCP server for OBNH/OBR infrastructure management",
"capabilities": {
"tools": True,
"resources": False,
"prompts": False
},
"transport": "http+sse"
}
if __name__ == "__main__":
print("Starting SSH-MCP HTTP/SSE Server on port 8081...")
uvicorn.run(
app,
host="0.0.0.0",
port=8081,
log_level="info"
)

View File

@@ -0,0 +1,354 @@
#!/usr/bin/env python3
"""
SSH-MCP Server for OBNH/OBR Infrastructure Management
This MCP server provides SSH-based tools for infrastructure access.
"""
import asyncio
import json
import subprocess
from typing import Any, Dict, List
from pathlib import Path
# MCP Server implementation using stdio transport
# Based on Model Context Protocol 2025 specification
class SSHMCPServer:
"""SSH-MCP Server providing infrastructure access tools"""
def __init__(self):
self.hosts = {
"photon": {
"hostname": "photon.obnh.io",
"ip": "46.247.109.251",
"description": "Debian server with Traefik, Gitea, Mastodon"
},
"proton": {
"hostname": "proton.obr.sh",
"ip": "72.61.83.117",
"description": "Ubuntu server with Traefik, Gitea"
},
"fry": {
"hostname": "fry.obr.sh",
"ip": "v48682",
"description": "Ubuntu server (photon replacement)"
},
"proxmox": {
"hostname": "10.50.0.72",
"ip": "10.50.0.72",
"description": "Proxmox host"
}
}
def list_tools(self) -> List[Dict[str, Any]]:
"""List available MCP tools"""
return [
{
"name": "ssh_exec",
"description": "Execute a command on a remote host via SSH",
"inputSchema": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Host to connect to (photon, proton, fry, proxmox)"
},
"command": {
"type": "string",
"description": "Command to execute"
}
},
"required": ["host", "command"]
}
},
{
"name": "ssh_list_hosts",
"description": "List all available hosts for SSH access",
"inputSchema": {
"type": "object",
"properties": {}
}
},
{
"name": "ssh_get_file",
"description": "Read a file from a remote host",
"inputSchema": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Host to connect to"
},
"path": {
"type": "string",
"description": "Path to the file"
}
},
"required": ["host", "path"]
}
},
{
"name": "ssh_put_file",
"description": "Write a file to a remote host",
"inputSchema": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Host to connect to"
},
"path": {
"type": "string",
"description": "Path where to write the file"
},
"content": {
"type": "string",
"description": "File content"
}
},
"required": ["host", "path", "content"]
}
},
{
"name": "ssh_docker_ps",
"description": "List Docker containers on a remote host",
"inputSchema": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Host to connect to"
}
},
"required": ["host"]
}
},
{
"name": "lxc_list",
"description": "List LXC containers on Proxmox host",
"inputSchema": {
"type": "object",
"properties": {
"proxmox_host": {
"type": "string",
"description": "Proxmox host (default: proxmox)",
"default": "proxmox"
}
}
}
},
{
"name": "lxc_exec",
"description": "Execute command in an LXC container on Proxmox",
"inputSchema": {
"type": "object",
"properties": {
"proxmox_host": {
"type": "string",
"description": "Proxmox host",
"default": "proxmox"
},
"container_id": {
"type": "string",
"description": "LXC container ID"
},
"command": {
"type": "string",
"description": "Command to execute"
}
},
"required": ["container_id", "command"]
}
}
]
async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool and return results"""
try:
if tool_name == "ssh_list_hosts":
return await self._ssh_list_hosts()
elif tool_name == "ssh_exec":
return await self._ssh_exec(arguments["host"], arguments["command"])
elif tool_name == "ssh_get_file":
return await self._ssh_get_file(arguments["host"], arguments["path"])
elif tool_name == "ssh_put_file":
return await self._ssh_put_file(arguments["host"], arguments["path"], arguments["content"])
elif tool_name == "ssh_docker_ps":
return await self._ssh_docker_ps(arguments["host"])
elif tool_name == "lxc_list":
proxmox_host = arguments.get("proxmox_host", "proxmox")
return await self._lxc_list(proxmox_host)
elif tool_name == "lxc_exec":
proxmox_host = arguments.get("proxmox_host", "proxmox")
return await self._lxc_exec(proxmox_host, arguments["container_id"], arguments["command"])
else:
return {
"isError": True,
"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}]
}
except Exception as e:
return {
"isError": True,
"content": [{"type": "text", "text": f"Error executing {tool_name}: {str(e)}"}]
}
async def _ssh_list_hosts(self) -> Dict[str, Any]:
"""List available hosts"""
hosts_info = "\n".join([
f"- {name}: {info['hostname']} ({info['description']})"
for name, info in self.hosts.items()
])
return {
"content": [{"type": "text", "text": f"Available hosts:\n{hosts_info}"}]
}
async def _ssh_exec(self, host: str, command: str) -> Dict[str, Any]:
"""Execute SSH command"""
if host not in self.hosts:
return {
"isError": True,
"content": [{"type": "text", "text": f"Unknown host: {host}"}]
}
hostname = self.hosts[host]["hostname"]
ssh_command = ["ssh", "-o", "StrictHostKeyChecking=no", f"root@{hostname}", command]
try:
process = await asyncio.create_subprocess_exec(
*ssh_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result = stdout.decode() if process.returncode == 0 else stderr.decode()
return {
"content": [{"type": "text", "text": result}]
}
except Exception as e:
return {
"isError": True,
"content": [{"type": "text", "text": f"SSH error: {str(e)}"}]
}
async def _ssh_get_file(self, host: str, path: str) -> Dict[str, Any]:
"""Get file from remote host"""
if host not in self.hosts:
return {
"isError": True,
"content": [{"type": "text", "text": f"Unknown host: {host}"}]
}
hostname = self.hosts[host]["hostname"]
result = await self._ssh_exec(host, f"cat {path}")
return result
async def _ssh_put_file(self, host: str, path: str, content: str) -> Dict[str, Any]:
"""Write file to remote host"""
if host not in self.hosts:
return {
"isError": True,
"content": [{"type": "text", "text": f"Unknown host: {host}"}]
}
hostname = self.hosts[host]["hostname"]
# Use heredoc to write file
command = f"cat > {path} << 'EOFMCP'\n{content}\nEOFMCP"
result = await self._ssh_exec(host, command)
return {
"content": [{"type": "text", "text": f"File written to {path}"}]
}
async def _ssh_docker_ps(self, host: str) -> Dict[str, Any]:
"""List Docker containers"""
result = await self._ssh_exec(host, "docker ps --format '{{.Names}}\\t{{.Status}}\\t{{.Image}}'")
return result
async def _lxc_list(self, proxmox_host: str) -> Dict[str, Any]:
"""List LXC containers"""
if proxmox_host not in self.hosts:
return {
"isError": True,
"content": [{"type": "text", "text": f"Unknown Proxmox host: {proxmox_host}"}]
}
result = await self._ssh_exec(proxmox_host, "pct list")
return result
async def _lxc_exec(self, proxmox_host: str, container_id: str, command: str) -> Dict[str, Any]:
"""Execute command in LXC container"""
if proxmox_host not in self.hosts:
return {
"isError": True,
"content": [{"type": "text", "text": f"Unknown Proxmox host: {proxmox_host}"}]
}
lxc_command = f"pct exec {container_id} -- {command}"
result = await self._ssh_exec(proxmox_host, lxc_command)
return result
async def main():
"""Run MCP server using stdio transport"""
server = SSHMCPServer()
# Read JSON-RPC messages from stdin, write to stdout
# This follows MCP stdio transport protocol
# Note: Do not print anything to stdout except JSON-RPC messages
while True:
try:
line = input()
if not line:
continue
request = json.loads(line)
if request.get("method") == "tools/list":
response = {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": {
"tools": server.list_tools()
}
}
elif request.get("method") == "tools/call":
params = request.get("params", {})
tool_name = params.get("name")
arguments = params.get("arguments", {})
result = await server.call_tool(tool_name, arguments)
response = {
"jsonrpc": "2.0",
"id": request.get("id"),
"result": result
}
else:
response = {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32601,
"message": f"Method not found: {request.get('method')}"
}
}
print(json.dumps(response), flush=True)
except EOFError:
break
except Exception as e:
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32603,
"message": f"Internal error: {str(e)}"
}
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":
asyncio.run(main())