blender-mcp/blender_mcp_addon.py

500 lines
19 KiB
Python

import bpy
import json
import threading
import socket
import time
from bpy.props import StringProperty, IntProperty
bl_info = {
"name": "Blender MCP",
"author": "BlenderMCP",
"version": (0, 1),
"blender": (3, 0, 0),
"location": "View3D > Sidebar > BlenderMCP",
"description": "Connect Blender to Claude via MCP",
"category": "Interface",
}
class BlenderMCPServer:
def __init__(self, host='localhost', port=9876):
self.host = host
self.port = port
self.running = False
self.socket = None
self.client = None
self.server_thread = None
def start(self):
self.running = True
self.server_thread = threading.Thread(target=self._run_server)
self.server_thread.daemon = True
self.server_thread.start()
print(f"BlenderMCP server started on {self.host}:{self.port}")
def stop(self):
self.running = False
if self.socket:
self.socket.close()
if self.client:
self.client.close()
print("BlenderMCP server stopped")
def _run_server(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.bind((self.host, self.port))
self.socket.listen(1)
self.socket.settimeout(1.0) # Add a timeout for accept
while self.running:
try:
self.client, address = self.socket.accept()
print(f"Connected to client: {address}")
while self.running:
try:
# Set a timeout for receiving data
self.client.settimeout(15.0)
data = self.client.recv(8192) # Increased buffer size
if not data:
print("Empty data received, client may have disconnected")
break
try:
print(f"Received data: {data.decode('utf-8')}")
command = json.loads(data.decode('utf-8'))
# Process the command
print(f"Processing command: {command.get('type')}")
response = self.execute_command(command)
# Send the response - handle large responses by chunking if needed
response_json = json.dumps(response)
print(f"Sending response: {response_json[:100]}...") # Truncate long responses in log
# Send in one go - most responses should be small enough
self.client.sendall(response_json.encode('utf-8'))
print("Response sent successfully")
except json.JSONDecodeError:
print(f"Invalid JSON received: {data.decode('utf-8')}")
self.client.sendall(json.dumps({
"status": "error",
"message": "Invalid JSON format"
}).encode('utf-8'))
except Exception as e:
print(f"Error executing command: {str(e)}")
import traceback
traceback.print_exc()
self.client.sendall(json.dumps({
"status": "error",
"message": str(e)
}).encode('utf-8'))
except socket.timeout:
print("Socket timeout while waiting for data")
continue
except Exception as e:
print(f"Error receiving data: {str(e)}")
break
if self.client:
self.client.close()
self.client = None
except socket.timeout:
# This is normal - just continue the loop
continue
except Exception as e:
print(f"Connection error: {str(e)}")
if self.client:
self.client.close()
self.client = None
time.sleep(1) # Prevent busy waiting
except Exception as e:
print(f"Server error: {str(e)}")
finally:
if self.socket:
self.socket.close()
def execute_command(self, command):
"""Execute a Blender command received from the MCP server"""
cmd_type = command.get("type")
params = command.get("params", {})
# Add a simple ping handler
if cmd_type == "ping":
print("Handling ping command")
return {"status": "success", "result": {"pong": True}}
handlers = {
"ping": lambda **kwargs: {"pong": True},
"get_simple_info": self.get_simple_info,
"get_scene_info": self.get_scene_info,
"create_object": self.create_object,
"modify_object": self.modify_object,
"delete_object": self.delete_object,
"get_object_info": self.get_object_info,
"execute_code": self.execute_code,
"set_material": self.set_material,
"render_scene": self.render_scene,
}
handler = handlers.get(cmd_type)
if handler:
try:
print(f"Executing handler for {cmd_type}")
result = handler(**params)
print(f"Handler execution complete")
return {"status": "success", "result": result}
except Exception as e:
print(f"Error in handler: {str(e)}")
import traceback
traceback.print_exc()
return {"status": "error", "message": str(e)}
else:
return {"status": "error", "message": f"Unknown command type: {cmd_type}"}
def get_simple_info(self):
"""Get basic Blender information"""
return {
"blender_version": ".".join(str(v) for v in bpy.app.version),
"scene_name": bpy.context.scene.name,
"object_count": len(bpy.context.scene.objects)
}
def get_scene_info(self):
"""Get information about the current Blender scene"""
try:
print("Getting scene info...")
# Simplify the scene info to reduce data size
scene_info = {
"name": bpy.context.scene.name,
"object_count": len(bpy.context.scene.objects),
"objects": [],
"materials_count": len(bpy.data.materials),
}
# Collect minimal object information (limit to first 10 objects)
for i, obj in enumerate(bpy.context.scene.objects):
if i >= 10: # Reduced from 20 to 10
break
obj_info = {
"name": obj.name,
"type": obj.type,
# Only include basic location data
"location": [round(float(obj.location.x), 2),
round(float(obj.location.y), 2),
round(float(obj.location.z), 2)],
}
scene_info["objects"].append(obj_info)
print(f"Scene info collected: {len(scene_info['objects'])} objects")
return scene_info
except Exception as e:
print(f"Error in get_scene_info: {str(e)}")
import traceback
traceback.print_exc()
return {"error": str(e)}
def create_object(self, type="CUBE", name=None, location=(0, 0, 0), rotation=(0, 0, 0), scale=(1, 1, 1)):
"""Create a new object in the scene"""
# Deselect all objects
bpy.ops.object.select_all(action='DESELECT')
# Create the object based on type
if type == "CUBE":
bpy.ops.mesh.primitive_cube_add(location=location, rotation=rotation, scale=scale)
elif type == "SPHERE":
bpy.ops.mesh.primitive_uv_sphere_add(location=location, rotation=rotation, scale=scale)
elif type == "CYLINDER":
bpy.ops.mesh.primitive_cylinder_add(location=location, rotation=rotation, scale=scale)
elif type == "PLANE":
bpy.ops.mesh.primitive_plane_add(location=location, rotation=rotation, scale=scale)
elif type == "CONE":
bpy.ops.mesh.primitive_cone_add(location=location, rotation=rotation, scale=scale)
elif type == "TORUS":
bpy.ops.mesh.primitive_torus_add(location=location, rotation=rotation, scale=scale)
elif type == "EMPTY":
bpy.ops.object.empty_add(location=location, rotation=rotation, scale=scale)
elif type == "CAMERA":
bpy.ops.object.camera_add(location=location, rotation=rotation)
elif type == "LIGHT":
bpy.ops.object.light_add(type='POINT', location=location, rotation=rotation, scale=scale)
else:
raise ValueError(f"Unsupported object type: {type}")
# Get the created object
obj = bpy.context.active_object
# Rename if name is provided
if name:
obj.name = name
return {
"name": obj.name,
"type": obj.type,
"location": [obj.location.x, obj.location.y, obj.location.z],
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
}
def modify_object(self, name, location=None, rotation=None, scale=None, visible=None):
"""Modify an existing object in the scene"""
# Find the object by name
obj = bpy.data.objects.get(name)
if not obj:
raise ValueError(f"Object not found: {name}")
# Modify properties as requested
if location is not None:
obj.location = location
if rotation is not None:
obj.rotation_euler = rotation
if scale is not None:
obj.scale = scale
if visible is not None:
obj.hide_viewport = not visible
obj.hide_render = not visible
return {
"name": obj.name,
"type": obj.type,
"location": [obj.location.x, obj.location.y, obj.location.z],
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
"visible": obj.visible_get(),
}
def delete_object(self, name):
"""Delete an object from the scene"""
obj = bpy.data.objects.get(name)
if not obj:
raise ValueError(f"Object not found: {name}")
# Store the name to return
obj_name = obj.name
# Select and delete the object
bpy.ops.object.select_all(action='DESELECT')
obj.select_set(True)
bpy.ops.object.delete()
return {"deleted": obj_name}
def get_object_info(self, name):
"""Get detailed information about a specific object"""
obj = bpy.data.objects.get(name)
if not obj:
raise ValueError(f"Object not found: {name}")
# Basic object info
obj_info = {
"name": obj.name,
"type": obj.type,
"location": [obj.location.x, obj.location.y, obj.location.z],
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
"visible": obj.visible_get(),
"materials": [],
}
# Add material slots
for slot in obj.material_slots:
if slot.material:
obj_info["materials"].append(slot.material.name)
# Add mesh data if applicable
if obj.type == 'MESH' and obj.data:
mesh = obj.data
obj_info["mesh"] = {
"vertices": len(mesh.vertices),
"edges": len(mesh.edges),
"polygons": len(mesh.polygons),
}
return obj_info
def execute_code(self, code):
"""Execute arbitrary Blender Python code"""
# This is powerful but potentially dangerous - use with caution
try:
# Create a local namespace for execution
namespace = {"bpy": bpy}
exec(code, namespace)
return {"executed": True}
except Exception as e:
raise Exception(f"Code execution error: {str(e)}")
def set_material(self, object_name, material_name=None, create_if_missing=True, color=None):
"""Set or create a material for an object"""
obj = bpy.data.objects.get(object_name)
if not obj:
raise ValueError(f"Object not found: {object_name}")
# If material_name is provided, try to find or create the material
if material_name:
mat = bpy.data.materials.get(material_name)
if not mat and create_if_missing:
mat = bpy.data.materials.new(name=material_name)
# Set material color if provided
if color and len(color) >= 3:
mat.use_nodes = True
bsdf = mat.node_tree.nodes.get('Principled BSDF')
if bsdf:
bsdf.inputs[0].default_value = (color[0], color[1], color[2], 1.0 if len(color) < 4 else color[3])
# Assign material to object
if mat:
if obj.data.materials:
obj.data.materials[0] = mat
else:
obj.data.materials.append(mat)
return {"object": object_name, "material": material_name}
# If only color is provided, create a new material with the color
elif color:
# Create a new material with auto-generated name
mat_name = f"{object_name}_material"
mat = bpy.data.materials.new(name=mat_name)
# Set material color
mat.use_nodes = True
bsdf = mat.node_tree.nodes.get('Principled BSDF')
if bsdf:
bsdf.inputs[0].default_value = (color[0], color[1], color[2], 1.0 if len(color) < 4 else color[3])
# Assign material to object
if obj.data.materials:
obj.data.materials[0] = mat
else:
obj.data.materials.append(mat)
return {"object": object_name, "material": mat_name}
else:
return {"error": "Either material_name or color must be provided"}
def render_scene(self, output_path=None, resolution_x=None, resolution_y=None):
"""Render the current scene"""
if resolution_x is not None:
bpy.context.scene.render.resolution_x = resolution_x
if resolution_y is not None:
bpy.context.scene.render.resolution_y = resolution_y
if output_path:
bpy.context.scene.render.filepath = output_path
# Render the scene
bpy.ops.render.render(write_still=bool(output_path))
return {
"rendered": True,
"output_path": output_path if output_path else "[not saved]",
"resolution": [bpy.context.scene.render.resolution_x, bpy.context.scene.render.resolution_y],
}
# Blender UI Panel
class BLENDERMCP_PT_Panel(bpy.types.Panel):
bl_label = "Blender MCP"
bl_idname = "BLENDERMCP_PT_Panel"
bl_space_type = 'VIEW_3D'
bl_region_type = 'UI'
bl_category = 'BlenderMCP'
def draw(self, context):
layout = self.layout
scene = context.scene
layout.prop(scene, "blendermcp_port")
if not scene.blendermcp_server_running:
layout.operator("blendermcp.start_server", text="Start MCP Server")
else:
layout.operator("blendermcp.stop_server", text="Stop MCP Server")
layout.label(text=f"Running on port {scene.blendermcp_port}")
# Operator to start the server
class BLENDERMCP_OT_StartServer(bpy.types.Operator):
bl_idname = "blendermcp.start_server"
bl_label = "Start BlenderMCP Server"
bl_description = "Start the BlenderMCP server to connect with Claude"
def execute(self, context):
scene = context.scene
# Create a new server instance
if not hasattr(bpy.types, "blendermcp_server") or not bpy.types.blendermcp_server:
bpy.types.blendermcp_server = BlenderMCPServer(port=scene.blendermcp_port)
# Start the server
bpy.types.blendermcp_server.start()
scene.blendermcp_server_running = True
return {'FINISHED'}
# Operator to stop the server
class BLENDERMCP_OT_StopServer(bpy.types.Operator):
bl_idname = "blendermcp.stop_server"
bl_label = "Stop BlenderMCP Server"
bl_description = "Stop the BlenderMCP server"
def execute(self, context):
scene = context.scene
# Stop the server if it exists
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
bpy.types.blendermcp_server.stop()
del bpy.types.blendermcp_server
scene.blendermcp_server_running = False
return {'FINISHED'}
# Registration functions
def register():
bpy.types.Scene.blendermcp_port = IntProperty(
name="Port",
description="Port for the BlenderMCP server",
default=9876,
min=1024,
max=65535
)
bpy.types.Scene.blendermcp_server_running = bpy.props.BoolProperty(
name="Server Running",
default=False
)
bpy.utils.register_class(BLENDERMCP_PT_Panel)
bpy.utils.register_class(BLENDERMCP_OT_StartServer)
bpy.utils.register_class(BLENDERMCP_OT_StopServer)
print("BlenderMCP addon registered")
def unregister():
# Stop the server if it's running
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
bpy.types.blendermcp_server.stop()
del bpy.types.blendermcp_server
bpy.utils.unregister_class(BLENDERMCP_PT_Panel)
bpy.utils.unregister_class(BLENDERMCP_OT_StartServer)
bpy.utils.unregister_class(BLENDERMCP_OT_StopServer)
del bpy.types.Scene.blendermcp_port
del bpy.types.Scene.blendermcp_server_running
print("BlenderMCP addon unregistered")
if __name__ == "__main__":
register()