553 lines
21 KiB
Python
553 lines
21 KiB
Python
import bpy
|
|
import json
|
|
import threading
|
|
import socket
|
|
import time
|
|
from bpy.props import StringProperty, IntProperty
|
|
|
|
bl_info = {
|
|
"name": "Blender MCP",
|
|
"author": "BlenderMCP",
|
|
"version": (0, 1),
|
|
"blender": (3, 0, 0),
|
|
"location": "View3D > Sidebar > BlenderMCP",
|
|
"description": "Connect Blender to Claude via MCP",
|
|
"category": "Interface",
|
|
}
|
|
|
|
class BlenderMCPServer:
|
|
def __init__(self, host='localhost', port=9876):
|
|
self.host = host
|
|
self.port = port
|
|
self.running = False
|
|
self.socket = None
|
|
self.client = None
|
|
self.command_queue = []
|
|
self.buffer = b'' # Add buffer for incomplete data
|
|
|
|
def start(self):
|
|
self.running = True
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
try:
|
|
self.socket.bind((self.host, self.port))
|
|
self.socket.listen(1)
|
|
self.socket.setblocking(False)
|
|
# Register the timer
|
|
bpy.app.timers.register(self._process_server, persistent=True)
|
|
print(f"BlenderMCP server started on {self.host}:{self.port}")
|
|
except Exception as e:
|
|
print(f"Failed to start server: {str(e)}")
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
if hasattr(bpy.app.timers, "unregister"):
|
|
if bpy.app.timers.is_registered(self._process_server):
|
|
bpy.app.timers.unregister(self._process_server)
|
|
if self.socket:
|
|
self.socket.close()
|
|
if self.client:
|
|
self.client.close()
|
|
self.socket = None
|
|
self.client = None
|
|
print("BlenderMCP server stopped")
|
|
|
|
def _process_server(self):
|
|
"""Timer callback to process server operations"""
|
|
if not self.running:
|
|
return None # Unregister timer
|
|
|
|
try:
|
|
# Accept new connections
|
|
if not self.client and self.socket:
|
|
try:
|
|
self.client, address = self.socket.accept()
|
|
self.client.setblocking(False)
|
|
print(f"Connected to client: {address}")
|
|
except BlockingIOError:
|
|
pass # No connection waiting
|
|
except Exception as e:
|
|
print(f"Error accepting connection: {str(e)}")
|
|
|
|
# Process existing connection
|
|
if self.client:
|
|
try:
|
|
# Try to receive data
|
|
try:
|
|
data = self.client.recv(8192)
|
|
if data:
|
|
self.buffer += data
|
|
# Try to process complete messages
|
|
try:
|
|
# Attempt to parse the buffer as JSON
|
|
command = json.loads(self.buffer.decode('utf-8'))
|
|
# If successful, clear the buffer and process command
|
|
self.buffer = b''
|
|
response = self.execute_command(command)
|
|
response_json = json.dumps(response)
|
|
self.client.sendall(response_json.encode('utf-8'))
|
|
except json.JSONDecodeError:
|
|
# Incomplete data, keep in buffer
|
|
pass
|
|
else:
|
|
# Connection closed by client
|
|
print("Client disconnected")
|
|
self.client.close()
|
|
self.client = None
|
|
self.buffer = b''
|
|
except BlockingIOError:
|
|
pass # No data available
|
|
except Exception as e:
|
|
print(f"Error receiving data: {str(e)}")
|
|
self.client.close()
|
|
self.client = None
|
|
self.buffer = b''
|
|
|
|
except Exception as e:
|
|
print(f"Error with client: {str(e)}")
|
|
if self.client:
|
|
self.client.close()
|
|
self.client = None
|
|
self.buffer = b''
|
|
|
|
except Exception as e:
|
|
print(f"Server error: {str(e)}")
|
|
|
|
return 0.1 # Continue timer with 0.1 second interval
|
|
|
|
def execute_command(self, command):
|
|
"""Execute a command in the main Blender thread"""
|
|
try:
|
|
cmd_type = command.get("type")
|
|
params = command.get("params", {})
|
|
|
|
# Ensure we're in the right context
|
|
if cmd_type in ["create_object", "modify_object", "delete_object"]:
|
|
override = bpy.context.copy()
|
|
override['area'] = [area for area in bpy.context.screen.areas if area.type == 'VIEW_3D'][0]
|
|
with bpy.context.temp_override(**override):
|
|
return self._execute_command_internal(command)
|
|
else:
|
|
return self._execute_command_internal(command)
|
|
|
|
except Exception as e:
|
|
print(f"Error executing command: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
def _execute_command_internal(self, command):
|
|
"""Internal command execution with proper context"""
|
|
cmd_type = command.get("type")
|
|
params = command.get("params", {})
|
|
|
|
# Add a simple ping handler
|
|
if cmd_type == "ping":
|
|
print("Handling ping command")
|
|
return {"status": "success", "result": {"pong": True}}
|
|
|
|
handlers = {
|
|
"ping": lambda **kwargs: {"pong": True},
|
|
"get_simple_info": self.get_simple_info,
|
|
"get_scene_info": self.get_scene_info,
|
|
"create_object": self.create_object,
|
|
"modify_object": self.modify_object,
|
|
"delete_object": self.delete_object,
|
|
"get_object_info": self.get_object_info,
|
|
"execute_code": self.execute_code,
|
|
"set_material": self.set_material,
|
|
"render_scene": self.render_scene,
|
|
}
|
|
|
|
handler = handlers.get(cmd_type)
|
|
if handler:
|
|
try:
|
|
print(f"Executing handler for {cmd_type}")
|
|
result = handler(**params)
|
|
print(f"Handler execution complete")
|
|
return {"status": "success", "result": result}
|
|
except Exception as e:
|
|
print(f"Error in handler: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {"status": "error", "message": str(e)}
|
|
else:
|
|
return {"status": "error", "message": f"Unknown command type: {cmd_type}"}
|
|
|
|
|
|
def get_simple_info(self):
|
|
"""Get basic Blender information"""
|
|
return {
|
|
"blender_version": ".".join(str(v) for v in bpy.app.version),
|
|
"scene_name": bpy.context.scene.name,
|
|
"object_count": len(bpy.context.scene.objects)
|
|
}
|
|
|
|
def get_scene_info(self):
|
|
"""Get information about the current Blender scene"""
|
|
try:
|
|
print("Getting scene info...")
|
|
# Simplify the scene info to reduce data size
|
|
scene_info = {
|
|
"name": bpy.context.scene.name,
|
|
"object_count": len(bpy.context.scene.objects),
|
|
"objects": [],
|
|
"materials_count": len(bpy.data.materials),
|
|
}
|
|
|
|
# Collect minimal object information (limit to first 10 objects)
|
|
for i, obj in enumerate(bpy.context.scene.objects):
|
|
if i >= 10: # Reduced from 20 to 10
|
|
break
|
|
|
|
obj_info = {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
# Only include basic location data
|
|
"location": [round(float(obj.location.x), 2),
|
|
round(float(obj.location.y), 2),
|
|
round(float(obj.location.z), 2)],
|
|
}
|
|
scene_info["objects"].append(obj_info)
|
|
|
|
print(f"Scene info collected: {len(scene_info['objects'])} objects")
|
|
return scene_info
|
|
except Exception as e:
|
|
print(f"Error in get_scene_info: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {"error": str(e)}
|
|
|
|
def create_object(self, type="CUBE", name=None, location=(0, 0, 0), rotation=(0, 0, 0), scale=(1, 1, 1)):
|
|
"""Create a new object in the scene"""
|
|
# Deselect all objects
|
|
bpy.ops.object.select_all(action='DESELECT')
|
|
|
|
# Create the object based on type
|
|
if type == "CUBE":
|
|
bpy.ops.mesh.primitive_cube_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "SPHERE":
|
|
bpy.ops.mesh.primitive_uv_sphere_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "CYLINDER":
|
|
bpy.ops.mesh.primitive_cylinder_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "PLANE":
|
|
bpy.ops.mesh.primitive_plane_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "CONE":
|
|
bpy.ops.mesh.primitive_cone_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "TORUS":
|
|
bpy.ops.mesh.primitive_torus_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "EMPTY":
|
|
bpy.ops.object.empty_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "CAMERA":
|
|
bpy.ops.object.camera_add(location=location, rotation=rotation)
|
|
elif type == "LIGHT":
|
|
bpy.ops.object.light_add(type='POINT', location=location, rotation=rotation, scale=scale)
|
|
else:
|
|
raise ValueError(f"Unsupported object type: {type}")
|
|
|
|
# Get the created object
|
|
obj = bpy.context.active_object
|
|
|
|
# Rename if name is provided
|
|
if name:
|
|
obj.name = name
|
|
|
|
return {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
"location": [obj.location.x, obj.location.y, obj.location.z],
|
|
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
|
|
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
|
|
}
|
|
|
|
def modify_object(self, name, location=None, rotation=None, scale=None, visible=None):
|
|
"""Modify an existing object in the scene"""
|
|
# Find the object by name
|
|
obj = bpy.data.objects.get(name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {name}")
|
|
|
|
# Modify properties as requested
|
|
if location is not None:
|
|
obj.location = location
|
|
|
|
if rotation is not None:
|
|
obj.rotation_euler = rotation
|
|
|
|
if scale is not None:
|
|
obj.scale = scale
|
|
|
|
if visible is not None:
|
|
obj.hide_viewport = not visible
|
|
obj.hide_render = not visible
|
|
|
|
return {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
"location": [obj.location.x, obj.location.y, obj.location.z],
|
|
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
|
|
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
|
|
"visible": obj.visible_get(),
|
|
}
|
|
|
|
def delete_object(self, name):
|
|
"""Delete an object from the scene"""
|
|
obj = bpy.data.objects.get(name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {name}")
|
|
|
|
# Store the name to return
|
|
obj_name = obj.name
|
|
|
|
# Select and delete the object
|
|
bpy.ops.object.select_all(action='DESELECT')
|
|
obj.select_set(True)
|
|
bpy.ops.object.delete()
|
|
|
|
return {"deleted": obj_name}
|
|
|
|
def get_object_info(self, name):
|
|
"""Get detailed information about a specific object"""
|
|
obj = bpy.data.objects.get(name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {name}")
|
|
|
|
# Basic object info
|
|
obj_info = {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
"location": [obj.location.x, obj.location.y, obj.location.z],
|
|
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
|
|
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
|
|
"visible": obj.visible_get(),
|
|
"materials": [],
|
|
}
|
|
|
|
# Add material slots
|
|
for slot in obj.material_slots:
|
|
if slot.material:
|
|
obj_info["materials"].append(slot.material.name)
|
|
|
|
# Add mesh data if applicable
|
|
if obj.type == 'MESH' and obj.data:
|
|
mesh = obj.data
|
|
obj_info["mesh"] = {
|
|
"vertices": len(mesh.vertices),
|
|
"edges": len(mesh.edges),
|
|
"polygons": len(mesh.polygons),
|
|
}
|
|
|
|
return obj_info
|
|
|
|
def execute_code(self, code):
|
|
"""Execute arbitrary Blender Python code"""
|
|
# This is powerful but potentially dangerous - use with caution
|
|
try:
|
|
# Create a local namespace for execution
|
|
namespace = {"bpy": bpy}
|
|
exec(code, namespace)
|
|
return {"executed": True}
|
|
except Exception as e:
|
|
raise Exception(f"Code execution error: {str(e)}")
|
|
|
|
def set_material(self, object_name, material_name=None, create_if_missing=True, color=None):
|
|
"""Set or create a material for an object"""
|
|
try:
|
|
# Get the object
|
|
obj = bpy.data.objects.get(object_name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {object_name}")
|
|
|
|
# Make sure object can accept materials
|
|
if not hasattr(obj, 'data') or not hasattr(obj.data, 'materials'):
|
|
raise ValueError(f"Object {object_name} cannot accept materials")
|
|
|
|
# Create or get material
|
|
if material_name:
|
|
mat = bpy.data.materials.get(material_name)
|
|
if not mat and create_if_missing:
|
|
mat = bpy.data.materials.new(name=material_name)
|
|
print(f"Created new material: {material_name}")
|
|
else:
|
|
# Generate unique material name if none provided
|
|
mat_name = f"{object_name}_material"
|
|
mat = bpy.data.materials.get(mat_name)
|
|
if not mat:
|
|
mat = bpy.data.materials.new(name=mat_name)
|
|
material_name = mat_name
|
|
print(f"Using material: {mat_name}")
|
|
|
|
# Set up material nodes if needed
|
|
if mat:
|
|
if not mat.use_nodes:
|
|
mat.use_nodes = True
|
|
|
|
# Get or create Principled BSDF
|
|
principled = mat.node_tree.nodes.get('Principled BSDF')
|
|
if not principled:
|
|
principled = mat.node_tree.nodes.new('ShaderNodeBsdfPrincipled')
|
|
# Get or create Material Output
|
|
output = mat.node_tree.nodes.get('Material Output')
|
|
if not output:
|
|
output = mat.node_tree.nodes.new('ShaderNodeOutputMaterial')
|
|
# Link if not already linked
|
|
if not principled.outputs[0].links:
|
|
mat.node_tree.links.new(principled.outputs[0], output.inputs[0])
|
|
|
|
# Set color if provided
|
|
if color and len(color) >= 3:
|
|
principled.inputs['Base Color'].default_value = (
|
|
color[0],
|
|
color[1],
|
|
color[2],
|
|
1.0 if len(color) < 4 else color[3]
|
|
)
|
|
print(f"Set material color to {color}")
|
|
|
|
# Assign material to object if not already assigned
|
|
if mat:
|
|
if not obj.data.materials:
|
|
obj.data.materials.append(mat)
|
|
else:
|
|
# Only modify first material slot
|
|
obj.data.materials[0] = mat
|
|
|
|
print(f"Assigned material {mat.name} to object {object_name}")
|
|
|
|
return {
|
|
"status": "success",
|
|
"object": object_name,
|
|
"material": mat.name,
|
|
"color": color if color else None
|
|
}
|
|
else:
|
|
raise ValueError(f"Failed to create or find material: {material_name}")
|
|
|
|
except Exception as e:
|
|
print(f"Error in set_material: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
"object": object_name,
|
|
"material": material_name if 'material_name' in locals() else None
|
|
}
|
|
|
|
def render_scene(self, output_path=None, resolution_x=None, resolution_y=None):
|
|
"""Render the current scene"""
|
|
if resolution_x is not None:
|
|
bpy.context.scene.render.resolution_x = resolution_x
|
|
|
|
if resolution_y is not None:
|
|
bpy.context.scene.render.resolution_y = resolution_y
|
|
|
|
if output_path:
|
|
bpy.context.scene.render.filepath = output_path
|
|
|
|
# Render the scene
|
|
bpy.ops.render.render(write_still=bool(output_path))
|
|
|
|
return {
|
|
"rendered": True,
|
|
"output_path": output_path if output_path else "[not saved]",
|
|
"resolution": [bpy.context.scene.render.resolution_x, bpy.context.scene.render.resolution_y],
|
|
}
|
|
|
|
# Blender UI Panel
|
|
class BLENDERMCP_PT_Panel(bpy.types.Panel):
|
|
bl_label = "Blender MCP"
|
|
bl_idname = "BLENDERMCP_PT_Panel"
|
|
bl_space_type = 'VIEW_3D'
|
|
bl_region_type = 'UI'
|
|
bl_category = 'BlenderMCP'
|
|
|
|
def draw(self, context):
|
|
layout = self.layout
|
|
scene = context.scene
|
|
|
|
layout.prop(scene, "blendermcp_port")
|
|
|
|
if not scene.blendermcp_server_running:
|
|
layout.operator("blendermcp.start_server", text="Start MCP Server")
|
|
else:
|
|
layout.operator("blendermcp.stop_server", text="Stop MCP Server")
|
|
layout.label(text=f"Running on port {scene.blendermcp_port}")
|
|
|
|
# Operator to start the server
|
|
class BLENDERMCP_OT_StartServer(bpy.types.Operator):
|
|
bl_idname = "blendermcp.start_server"
|
|
bl_label = "Start BlenderMCP Server"
|
|
bl_description = "Start the BlenderMCP server to connect with Claude"
|
|
|
|
def execute(self, context):
|
|
scene = context.scene
|
|
|
|
# Create a new server instance
|
|
if not hasattr(bpy.types, "blendermcp_server") or not bpy.types.blendermcp_server:
|
|
bpy.types.blendermcp_server = BlenderMCPServer(port=scene.blendermcp_port)
|
|
|
|
# Start the server
|
|
bpy.types.blendermcp_server.start()
|
|
scene.blendermcp_server_running = True
|
|
|
|
return {'FINISHED'}
|
|
|
|
# Operator to stop the server
|
|
class BLENDERMCP_OT_StopServer(bpy.types.Operator):
|
|
bl_idname = "blendermcp.stop_server"
|
|
bl_label = "Stop BlenderMCP Server"
|
|
bl_description = "Stop the BlenderMCP server"
|
|
|
|
def execute(self, context):
|
|
scene = context.scene
|
|
|
|
# Stop the server if it exists
|
|
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
|
|
bpy.types.blendermcp_server.stop()
|
|
del bpy.types.blendermcp_server
|
|
|
|
scene.blendermcp_server_running = False
|
|
|
|
return {'FINISHED'}
|
|
|
|
# Registration functions
|
|
def register():
|
|
bpy.types.Scene.blendermcp_port = IntProperty(
|
|
name="Port",
|
|
description="Port for the BlenderMCP server",
|
|
default=9876,
|
|
min=1024,
|
|
max=65535
|
|
)
|
|
|
|
bpy.types.Scene.blendermcp_server_running = bpy.props.BoolProperty(
|
|
name="Server Running",
|
|
default=False
|
|
)
|
|
|
|
bpy.utils.register_class(BLENDERMCP_PT_Panel)
|
|
bpy.utils.register_class(BLENDERMCP_OT_StartServer)
|
|
bpy.utils.register_class(BLENDERMCP_OT_StopServer)
|
|
|
|
print("BlenderMCP addon registered")
|
|
|
|
def unregister():
|
|
# Stop the server if it's running
|
|
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
|
|
bpy.types.blendermcp_server.stop()
|
|
del bpy.types.blendermcp_server
|
|
|
|
bpy.utils.unregister_class(BLENDERMCP_PT_Panel)
|
|
bpy.utils.unregister_class(BLENDERMCP_OT_StartServer)
|
|
bpy.utils.unregister_class(BLENDERMCP_OT_StopServer)
|
|
|
|
del bpy.types.Scene.blendermcp_port
|
|
del bpy.types.Scene.blendermcp_server_running
|
|
|
|
print("BlenderMCP addon unregistered")
|
|
|
|
if __name__ == "__main__":
|
|
register() |