895 lines
39 KiB
Python
895 lines
39 KiB
Python
import bpy
|
|
import json
|
|
import threading
|
|
import socket
|
|
import time
|
|
import requests # Add this import for HTTP requests
|
|
import tempfile # Add this import for temporary directories
|
|
from bpy.props import StringProperty, IntProperty
|
|
import traceback
|
|
import os
|
|
import shutil
|
|
|
|
bl_info = {
|
|
"name": "Blender MCP",
|
|
"author": "BlenderMCP",
|
|
"version": (0, 1),
|
|
"blender": (3, 0, 0),
|
|
"location": "View3D > Sidebar > BlenderMCP",
|
|
"description": "Connect Blender to Claude via MCP",
|
|
"category": "Interface",
|
|
}
|
|
|
|
class BlenderMCPServer:
|
|
def __init__(self, host='localhost', port=9876):
|
|
self.host = host
|
|
self.port = port
|
|
self.running = False
|
|
self.socket = None
|
|
self.client = None
|
|
self.command_queue = []
|
|
self.buffer = b'' # Add buffer for incomplete data
|
|
|
|
def start(self):
|
|
self.running = True
|
|
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
try:
|
|
self.socket.bind((self.host, self.port))
|
|
self.socket.listen(1)
|
|
self.socket.setblocking(False)
|
|
# Register the timer
|
|
bpy.app.timers.register(self._process_server, persistent=True)
|
|
print(f"BlenderMCP server started on {self.host}:{self.port}")
|
|
except Exception as e:
|
|
print(f"Failed to start server: {str(e)}")
|
|
self.stop()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
if hasattr(bpy.app.timers, "unregister"):
|
|
if bpy.app.timers.is_registered(self._process_server):
|
|
bpy.app.timers.unregister(self._process_server)
|
|
if self.socket:
|
|
self.socket.close()
|
|
if self.client:
|
|
self.client.close()
|
|
self.socket = None
|
|
self.client = None
|
|
print("BlenderMCP server stopped")
|
|
|
|
def _process_server(self):
|
|
"""Timer callback to process server operations"""
|
|
if not self.running:
|
|
return None # Unregister timer
|
|
|
|
try:
|
|
# Accept new connections
|
|
if not self.client and self.socket:
|
|
try:
|
|
self.client, address = self.socket.accept()
|
|
self.client.setblocking(False)
|
|
print(f"Connected to client: {address}")
|
|
except BlockingIOError:
|
|
pass # No connection waiting
|
|
except Exception as e:
|
|
print(f"Error accepting connection: {str(e)}")
|
|
|
|
# Process existing connection
|
|
if self.client:
|
|
try:
|
|
# Try to receive data
|
|
try:
|
|
data = self.client.recv(8192)
|
|
if data:
|
|
self.buffer += data
|
|
# Try to process complete messages
|
|
try:
|
|
# Attempt to parse the buffer as JSON
|
|
command = json.loads(self.buffer.decode('utf-8'))
|
|
# If successful, clear the buffer and process command
|
|
self.buffer = b''
|
|
response = self.execute_command(command)
|
|
response_json = json.dumps(response)
|
|
self.client.sendall(response_json.encode('utf-8'))
|
|
except json.JSONDecodeError:
|
|
# Incomplete data, keep in buffer
|
|
pass
|
|
else:
|
|
# Connection closed by client
|
|
print("Client disconnected")
|
|
self.client.close()
|
|
self.client = None
|
|
self.buffer = b''
|
|
except BlockingIOError:
|
|
pass # No data available
|
|
except Exception as e:
|
|
print(f"Error receiving data: {str(e)}")
|
|
self.client.close()
|
|
self.client = None
|
|
self.buffer = b''
|
|
|
|
except Exception as e:
|
|
print(f"Error with client: {str(e)}")
|
|
if self.client:
|
|
self.client.close()
|
|
self.client = None
|
|
self.buffer = b''
|
|
|
|
except Exception as e:
|
|
print(f"Server error: {str(e)}")
|
|
|
|
return 0.1 # Continue timer with 0.1 second interval
|
|
|
|
def execute_command(self, command):
|
|
"""Execute a command in the main Blender thread"""
|
|
try:
|
|
cmd_type = command.get("type")
|
|
params = command.get("params", {})
|
|
|
|
# Ensure we're in the right context
|
|
if cmd_type in ["create_object", "modify_object", "delete_object"]:
|
|
override = bpy.context.copy()
|
|
override['area'] = [area for area in bpy.context.screen.areas if area.type == 'VIEW_3D'][0]
|
|
with bpy.context.temp_override(**override):
|
|
return self._execute_command_internal(command)
|
|
else:
|
|
return self._execute_command_internal(command)
|
|
|
|
except Exception as e:
|
|
print(f"Error executing command: {str(e)}")
|
|
|
|
traceback.print_exc()
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
def _execute_command_internal(self, command):
|
|
"""Internal command execution with proper context"""
|
|
cmd_type = command.get("type")
|
|
params = command.get("params", {})
|
|
|
|
# Add a simple ping handler
|
|
if cmd_type == "ping":
|
|
print("Handling ping command")
|
|
return {"status": "success", "result": {"pong": True}}
|
|
|
|
handlers = {
|
|
"ping": lambda **kwargs: {"pong": True},
|
|
"get_simple_info": self.get_simple_info,
|
|
"get_scene_info": self.get_scene_info,
|
|
"create_object": self.create_object,
|
|
"modify_object": self.modify_object,
|
|
"delete_object": self.delete_object,
|
|
"get_object_info": self.get_object_info,
|
|
"execute_code": self.execute_code,
|
|
"set_material": self.set_material,
|
|
"render_scene": self.render_scene,
|
|
# Add Polyhaven handlers
|
|
"get_polyhaven_categories": self.get_polyhaven_categories,
|
|
"search_polyhaven_assets": self.search_polyhaven_assets,
|
|
"download_polyhaven_asset": self.download_polyhaven_asset,
|
|
}
|
|
|
|
handler = handlers.get(cmd_type)
|
|
if handler:
|
|
try:
|
|
print(f"Executing handler for {cmd_type}")
|
|
result = handler(**params)
|
|
print(f"Handler execution complete")
|
|
return {"status": "success", "result": result}
|
|
except Exception as e:
|
|
print(f"Error in handler: {str(e)}")
|
|
traceback.print_exc()
|
|
return {"status": "error", "message": str(e)}
|
|
else:
|
|
return {"status": "error", "message": f"Unknown command type: {cmd_type}"}
|
|
|
|
|
|
def get_simple_info(self):
|
|
"""Get basic Blender information"""
|
|
return {
|
|
"blender_version": ".".join(str(v) for v in bpy.app.version),
|
|
"scene_name": bpy.context.scene.name,
|
|
"object_count": len(bpy.context.scene.objects)
|
|
}
|
|
|
|
def get_scene_info(self):
|
|
"""Get information about the current Blender scene"""
|
|
try:
|
|
print("Getting scene info...")
|
|
# Simplify the scene info to reduce data size
|
|
scene_info = {
|
|
"name": bpy.context.scene.name,
|
|
"object_count": len(bpy.context.scene.objects),
|
|
"objects": [],
|
|
"materials_count": len(bpy.data.materials),
|
|
}
|
|
|
|
# Collect minimal object information (limit to first 10 objects)
|
|
for i, obj in enumerate(bpy.context.scene.objects):
|
|
if i >= 10: # Reduced from 20 to 10
|
|
break
|
|
|
|
obj_info = {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
# Only include basic location data
|
|
"location": [round(float(obj.location.x), 2),
|
|
round(float(obj.location.y), 2),
|
|
round(float(obj.location.z), 2)],
|
|
}
|
|
scene_info["objects"].append(obj_info)
|
|
|
|
print(f"Scene info collected: {len(scene_info['objects'])} objects")
|
|
return scene_info
|
|
except Exception as e:
|
|
print(f"Error in get_scene_info: {str(e)}")
|
|
traceback.print_exc()
|
|
return {"error": str(e)}
|
|
|
|
def create_object(self, type="CUBE", name=None, location=(0, 0, 0), rotation=(0, 0, 0), scale=(1, 1, 1)):
|
|
"""Create a new object in the scene"""
|
|
# Deselect all objects
|
|
bpy.ops.object.select_all(action='DESELECT')
|
|
|
|
# Create the object based on type
|
|
if type == "CUBE":
|
|
bpy.ops.mesh.primitive_cube_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "SPHERE":
|
|
bpy.ops.mesh.primitive_uv_sphere_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "CYLINDER":
|
|
bpy.ops.mesh.primitive_cylinder_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "PLANE":
|
|
bpy.ops.mesh.primitive_plane_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "CONE":
|
|
bpy.ops.mesh.primitive_cone_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "TORUS":
|
|
bpy.ops.mesh.primitive_torus_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "EMPTY":
|
|
bpy.ops.object.empty_add(location=location, rotation=rotation, scale=scale)
|
|
elif type == "CAMERA":
|
|
bpy.ops.object.camera_add(location=location, rotation=rotation)
|
|
elif type == "LIGHT":
|
|
bpy.ops.object.light_add(type='POINT', location=location, rotation=rotation, scale=scale)
|
|
else:
|
|
raise ValueError(f"Unsupported object type: {type}")
|
|
|
|
# Get the created object
|
|
obj = bpy.context.active_object
|
|
|
|
# Rename if name is provided
|
|
if name:
|
|
obj.name = name
|
|
|
|
return {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
"location": [obj.location.x, obj.location.y, obj.location.z],
|
|
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
|
|
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
|
|
}
|
|
|
|
def modify_object(self, name, location=None, rotation=None, scale=None, visible=None):
|
|
"""Modify an existing object in the scene"""
|
|
# Find the object by name
|
|
obj = bpy.data.objects.get(name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {name}")
|
|
|
|
# Modify properties as requested
|
|
if location is not None:
|
|
obj.location = location
|
|
|
|
if rotation is not None:
|
|
obj.rotation_euler = rotation
|
|
|
|
if scale is not None:
|
|
obj.scale = scale
|
|
|
|
if visible is not None:
|
|
obj.hide_viewport = not visible
|
|
obj.hide_render = not visible
|
|
|
|
return {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
"location": [obj.location.x, obj.location.y, obj.location.z],
|
|
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
|
|
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
|
|
"visible": obj.visible_get(),
|
|
}
|
|
|
|
def delete_object(self, name):
|
|
"""Delete an object from the scene"""
|
|
obj = bpy.data.objects.get(name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {name}")
|
|
|
|
# Store the name to return
|
|
obj_name = obj.name
|
|
|
|
# Select and delete the object
|
|
bpy.ops.object.select_all(action='DESELECT')
|
|
obj.select_set(True)
|
|
bpy.ops.object.delete()
|
|
|
|
return {"deleted": obj_name}
|
|
|
|
def get_object_info(self, name):
|
|
"""Get detailed information about a specific object"""
|
|
obj = bpy.data.objects.get(name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {name}")
|
|
|
|
# Basic object info
|
|
obj_info = {
|
|
"name": obj.name,
|
|
"type": obj.type,
|
|
"location": [obj.location.x, obj.location.y, obj.location.z],
|
|
"rotation": [obj.rotation_euler.x, obj.rotation_euler.y, obj.rotation_euler.z],
|
|
"scale": [obj.scale.x, obj.scale.y, obj.scale.z],
|
|
"visible": obj.visible_get(),
|
|
"materials": [],
|
|
}
|
|
|
|
# Add material slots
|
|
for slot in obj.material_slots:
|
|
if slot.material:
|
|
obj_info["materials"].append(slot.material.name)
|
|
|
|
# Add mesh data if applicable
|
|
if obj.type == 'MESH' and obj.data:
|
|
mesh = obj.data
|
|
obj_info["mesh"] = {
|
|
"vertices": len(mesh.vertices),
|
|
"edges": len(mesh.edges),
|
|
"polygons": len(mesh.polygons),
|
|
}
|
|
|
|
return obj_info
|
|
|
|
def execute_code(self, code):
|
|
"""Execute arbitrary Blender Python code"""
|
|
# This is powerful but potentially dangerous - use with caution
|
|
try:
|
|
# Create a local namespace for execution
|
|
namespace = {"bpy": bpy}
|
|
exec(code, namespace)
|
|
return {"executed": True}
|
|
except Exception as e:
|
|
raise Exception(f"Code execution error: {str(e)}")
|
|
|
|
def set_material(self, object_name, material_name=None, create_if_missing=True, color=None):
|
|
"""Set or create a material for an object"""
|
|
try:
|
|
# Get the object
|
|
obj = bpy.data.objects.get(object_name)
|
|
if not obj:
|
|
raise ValueError(f"Object not found: {object_name}")
|
|
|
|
# Make sure object can accept materials
|
|
if not hasattr(obj, 'data') or not hasattr(obj.data, 'materials'):
|
|
raise ValueError(f"Object {object_name} cannot accept materials")
|
|
|
|
# Create or get material
|
|
if material_name:
|
|
mat = bpy.data.materials.get(material_name)
|
|
if not mat and create_if_missing:
|
|
mat = bpy.data.materials.new(name=material_name)
|
|
print(f"Created new material: {material_name}")
|
|
else:
|
|
# Generate unique material name if none provided
|
|
mat_name = f"{object_name}_material"
|
|
mat = bpy.data.materials.get(mat_name)
|
|
if not mat:
|
|
mat = bpy.data.materials.new(name=mat_name)
|
|
material_name = mat_name
|
|
print(f"Using material: {mat_name}")
|
|
|
|
# Set up material nodes if needed
|
|
if mat:
|
|
if not mat.use_nodes:
|
|
mat.use_nodes = True
|
|
|
|
# Get or create Principled BSDF
|
|
principled = mat.node_tree.nodes.get('Principled BSDF')
|
|
if not principled:
|
|
principled = mat.node_tree.nodes.new('ShaderNodeBsdfPrincipled')
|
|
# Get or create Material Output
|
|
output = mat.node_tree.nodes.get('Material Output')
|
|
if not output:
|
|
output = mat.node_tree.nodes.new('ShaderNodeOutputMaterial')
|
|
# Link if not already linked
|
|
if not principled.outputs[0].links:
|
|
mat.node_tree.links.new(principled.outputs[0], output.inputs[0])
|
|
|
|
# Set color if provided
|
|
if color and len(color) >= 3:
|
|
principled.inputs['Base Color'].default_value = (
|
|
color[0],
|
|
color[1],
|
|
color[2],
|
|
1.0 if len(color) < 4 else color[3]
|
|
)
|
|
print(f"Set material color to {color}")
|
|
|
|
# Assign material to object if not already assigned
|
|
if mat:
|
|
if not obj.data.materials:
|
|
obj.data.materials.append(mat)
|
|
else:
|
|
# Only modify first material slot
|
|
obj.data.materials[0] = mat
|
|
|
|
print(f"Assigned material {mat.name} to object {object_name}")
|
|
|
|
return {
|
|
"status": "success",
|
|
"object": object_name,
|
|
"material": mat.name,
|
|
"color": color if color else None
|
|
}
|
|
else:
|
|
raise ValueError(f"Failed to create or find material: {material_name}")
|
|
|
|
except Exception as e:
|
|
print(f"Error in set_material: {str(e)}")
|
|
traceback.print_exc()
|
|
return {
|
|
"status": "error",
|
|
"message": str(e),
|
|
"object": object_name,
|
|
"material": material_name if 'material_name' in locals() else None
|
|
}
|
|
|
|
def render_scene(self, output_path=None, resolution_x=None, resolution_y=None):
|
|
"""Render the current scene"""
|
|
if resolution_x is not None:
|
|
bpy.context.scene.render.resolution_x = resolution_x
|
|
|
|
if resolution_y is not None:
|
|
bpy.context.scene.render.resolution_y = resolution_y
|
|
|
|
if output_path:
|
|
bpy.context.scene.render.filepath = output_path
|
|
|
|
# Render the scene
|
|
bpy.ops.render.render(write_still=bool(output_path))
|
|
|
|
return {
|
|
"rendered": True,
|
|
"output_path": output_path if output_path else "[not saved]",
|
|
"resolution": [bpy.context.scene.render.resolution_x, bpy.context.scene.render.resolution_y],
|
|
}
|
|
|
|
def get_polyhaven_categories(self, asset_type):
|
|
"""Get categories for a specific asset type from Polyhaven"""
|
|
try:
|
|
if asset_type not in ["hdris", "textures", "models", "all"]:
|
|
return {"error": f"Invalid asset type: {asset_type}. Must be one of: hdris, textures, models, all"}
|
|
|
|
response = requests.get(f"https://api.polyhaven.com/categories/{asset_type}")
|
|
if response.status_code == 200:
|
|
return {"categories": response.json()}
|
|
else:
|
|
return {"error": f"API request failed with status code {response.status_code}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def search_polyhaven_assets(self, asset_type=None, categories=None):
|
|
"""Search for assets from Polyhaven with optional filtering"""
|
|
try:
|
|
url = "https://api.polyhaven.com/assets"
|
|
params = {}
|
|
|
|
if asset_type and asset_type != "all":
|
|
if asset_type not in ["hdris", "textures", "models"]:
|
|
return {"error": f"Invalid asset type: {asset_type}. Must be one of: hdris, textures, models, all"}
|
|
params["type"] = asset_type
|
|
|
|
if categories:
|
|
params["categories"] = categories
|
|
|
|
response = requests.get(url, params=params)
|
|
if response.status_code == 200:
|
|
# Limit the response size to avoid overwhelming Blender
|
|
assets = response.json()
|
|
# Return only the first 20 assets to keep response size manageable
|
|
limited_assets = {}
|
|
for i, (key, value) in enumerate(assets.items()):
|
|
if i >= 20: # Limit to 20 assets
|
|
break
|
|
limited_assets[key] = value
|
|
|
|
return {"assets": limited_assets, "total_count": len(assets), "returned_count": len(limited_assets)}
|
|
else:
|
|
return {"error": f"API request failed with status code {response.status_code}"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def download_polyhaven_asset(self, asset_id, asset_type, resolution="1k", file_format=None):
|
|
"""Download an asset from Polyhaven and import it into Blender"""
|
|
|
|
try:
|
|
# First get the files information
|
|
files_response = requests.get(f"https://api.polyhaven.com/files/{asset_id}")
|
|
if files_response.status_code != 200:
|
|
return {"error": f"Failed to get asset files: {files_response.status_code}"}
|
|
|
|
files_data = files_response.json()
|
|
|
|
# Handle different asset types
|
|
if asset_type == "hdris":
|
|
# For HDRIs, download the .hdr or .exr file
|
|
if not file_format:
|
|
file_format = "hdr" # Default format for HDRIs
|
|
|
|
if "hdri" in files_data and resolution in files_data["hdri"] and file_format in files_data["hdri"][resolution]:
|
|
file_info = files_data["hdri"][resolution][file_format]
|
|
file_url = file_info["url"]
|
|
|
|
# For HDRIs, we need to save to a temporary file first
|
|
# since Blender can't properly load HDR data directly from memory
|
|
with tempfile.NamedTemporaryFile(suffix=f".{file_format}", delete=False) as tmp_file:
|
|
# Download the file
|
|
response = requests.get(file_url)
|
|
if response.status_code != 200:
|
|
return {"error": f"Failed to download HDRI: {response.status_code}"}
|
|
|
|
tmp_file.write(response.content)
|
|
tmp_path = tmp_file.name
|
|
|
|
try:
|
|
# Create a new world if none exists
|
|
if not bpy.data.worlds:
|
|
bpy.data.worlds.new("World")
|
|
|
|
world = bpy.data.worlds[0]
|
|
world.use_nodes = True
|
|
node_tree = world.node_tree
|
|
|
|
# Clear existing nodes
|
|
for node in node_tree.nodes:
|
|
node_tree.nodes.remove(node)
|
|
|
|
# Create nodes
|
|
tex_coord = node_tree.nodes.new(type='ShaderNodeTexCoord')
|
|
tex_coord.location = (-800, 0)
|
|
|
|
mapping = node_tree.nodes.new(type='ShaderNodeMapping')
|
|
mapping.location = (-600, 0)
|
|
|
|
# Load the image from the temporary file
|
|
env_tex = node_tree.nodes.new(type='ShaderNodeTexEnvironment')
|
|
env_tex.location = (-400, 0)
|
|
env_tex.image = bpy.data.images.load(tmp_path)
|
|
|
|
# FIXED: Use a color space that exists in all Blender versions
|
|
if file_format.lower() == 'exr':
|
|
# Try to use Linear color space for EXR files
|
|
try:
|
|
env_tex.image.colorspace_settings.name = 'Linear'
|
|
except:
|
|
# Fallback to Non-Color if Linear isn't available
|
|
env_tex.image.colorspace_settings.name = 'Non-Color'
|
|
else: # hdr
|
|
# For HDR files, try these options in order
|
|
for color_space in ['Linear', 'Linear Rec.709', 'Non-Color']:
|
|
try:
|
|
env_tex.image.colorspace_settings.name = color_space
|
|
break # Stop if we successfully set a color space
|
|
except:
|
|
continue
|
|
|
|
background = node_tree.nodes.new(type='ShaderNodeBackground')
|
|
background.location = (-200, 0)
|
|
|
|
output = node_tree.nodes.new(type='ShaderNodeOutputWorld')
|
|
output.location = (0, 0)
|
|
|
|
# Connect nodes
|
|
node_tree.links.new(tex_coord.outputs['Generated'], mapping.inputs['Vector'])
|
|
node_tree.links.new(mapping.outputs['Vector'], env_tex.inputs['Vector'])
|
|
node_tree.links.new(env_tex.outputs['Color'], background.inputs['Color'])
|
|
node_tree.links.new(background.outputs['Background'], output.inputs['Surface'])
|
|
|
|
# Set as active world
|
|
bpy.context.scene.world = world
|
|
|
|
# Clean up temporary file
|
|
try:
|
|
tempfile._cleanup() # This will clean up all temporary files
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"HDRI {asset_id} imported successfully",
|
|
"image_name": env_tex.image.name
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"Failed to set up HDRI in Blender: {str(e)}"}
|
|
else:
|
|
return {"error": f"Requested resolution or format not available for this HDRI"}
|
|
|
|
elif asset_type == "textures":
|
|
# For textures, download available maps
|
|
if not file_format:
|
|
file_format = "jpg" # Default format for textures
|
|
|
|
# Find available maps (diffuse, normal, etc.)
|
|
downloaded_maps = {}
|
|
for map_type in files_data:
|
|
if map_type not in ["blend", "gltf"]: # Skip non-texture files
|
|
if resolution in files_data[map_type] and file_format in files_data[map_type][resolution]:
|
|
file_info = files_data[map_type][resolution][file_format]
|
|
file_url = file_info["url"]
|
|
|
|
# Download the file directly into Blender's memory
|
|
response = requests.get(file_url)
|
|
if response.status_code == 200:
|
|
# Create a new image in Blender's memory
|
|
image_name = f"{asset_id}_{map_type}.{file_format}"
|
|
image = bpy.data.images.new(name=image_name, width=1, height=1)
|
|
|
|
# Save the downloaded data
|
|
image.file_format = file_format.upper()
|
|
image.filepath_raw = f"/tmp/{image_name}" # This is just for reference
|
|
image.pack(data=response.content)
|
|
|
|
downloaded_maps[map_type] = image
|
|
|
|
if not downloaded_maps:
|
|
return {"error": f"No texture maps found for the requested resolution and format"}
|
|
|
|
# Create a new material with the downloaded textures
|
|
mat = bpy.data.materials.new(name=asset_id)
|
|
mat.use_nodes = True
|
|
nodes = mat.node_tree.nodes
|
|
links = mat.node_tree.links
|
|
|
|
# Clear default nodes
|
|
for node in nodes:
|
|
nodes.remove(node)
|
|
|
|
# Create output node
|
|
output = nodes.new(type='ShaderNodeOutputMaterial')
|
|
output.location = (300, 0)
|
|
|
|
# Create principled BSDF node
|
|
principled = nodes.new(type='ShaderNodeBsdfPrincipled')
|
|
principled.location = (0, 0)
|
|
links.new(principled.outputs[0], output.inputs[0])
|
|
|
|
# Add texture nodes based on available maps
|
|
tex_coord = nodes.new(type='ShaderNodeTexCoord')
|
|
tex_coord.location = (-800, 0)
|
|
|
|
mapping = nodes.new(type='ShaderNodeMapping')
|
|
mapping.location = (-600, 0)
|
|
links.new(tex_coord.outputs['UV'], mapping.inputs['Vector'])
|
|
|
|
# Position offset for texture nodes
|
|
x_pos = -400
|
|
y_pos = 300
|
|
|
|
# Connect different texture maps
|
|
for map_type, image in downloaded_maps.items():
|
|
tex_node = nodes.new(type='ShaderNodeTexImage')
|
|
tex_node.location = (x_pos, y_pos)
|
|
tex_node.image = image
|
|
tex_node.image.colorspace_settings.name = 'sRGB' if map_type in ['color', 'diffuse', 'albedo'] else 'Non-Color'
|
|
|
|
links.new(mapping.outputs['Vector'], tex_node.inputs['Vector'])
|
|
|
|
# Connect to appropriate input on Principled BSDF
|
|
if map_type in ['color', 'diffuse', 'albedo']:
|
|
links.new(tex_node.outputs['Color'], principled.inputs['Base Color'])
|
|
elif map_type in ['roughness', 'rough']:
|
|
links.new(tex_node.outputs['Color'], principled.inputs['Roughness'])
|
|
elif map_type in ['metallic', 'metalness', 'metal']:
|
|
links.new(tex_node.outputs['Color'], principled.inputs['Metallic'])
|
|
elif map_type in ['normal', 'nor']:
|
|
# Add normal map node
|
|
normal_map = nodes.new(type='ShaderNodeNormalMap')
|
|
normal_map.location = (x_pos + 200, y_pos)
|
|
links.new(tex_node.outputs['Color'], normal_map.inputs['Color'])
|
|
links.new(normal_map.outputs['Normal'], principled.inputs['Normal'])
|
|
elif map_type in ['displacement', 'disp', 'height']:
|
|
# Add displacement node
|
|
disp_node = nodes.new(type='ShaderNodeDisplacement')
|
|
disp_node.location = (x_pos + 200, y_pos - 200)
|
|
links.new(tex_node.outputs['Color'], disp_node.inputs['Height'])
|
|
links.new(disp_node.outputs['Displacement'], output.inputs['Displacement'])
|
|
|
|
y_pos -= 250
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Texture {asset_id} imported as material",
|
|
"material": mat.name,
|
|
"maps": list(downloaded_maps.keys())
|
|
}
|
|
|
|
elif asset_type == "models":
|
|
# For models, prefer glTF format if available
|
|
if not file_format:
|
|
file_format = "gltf" # Default format for models
|
|
|
|
if file_format in files_data and resolution in files_data[file_format]:
|
|
file_info = files_data[file_format][resolution][file_format]
|
|
file_url = file_info["url"]
|
|
|
|
# Create a temporary directory to store the model and its dependencies
|
|
temp_dir = tempfile.mkdtemp()
|
|
main_file_path = ""
|
|
|
|
try:
|
|
# Download the main model file
|
|
main_file_name = file_url.split("/")[-1]
|
|
main_file_path = os.path.join(temp_dir, main_file_name)
|
|
|
|
response = requests.get(file_url)
|
|
if response.status_code != 200:
|
|
return {"error": f"Failed to download model: {response.status_code}"}
|
|
|
|
with open(main_file_path, "wb") as f:
|
|
f.write(response.content)
|
|
|
|
# Check for included files and download them
|
|
if "include" in file_info and file_info["include"]:
|
|
for include_path, include_info in file_info["include"].items():
|
|
# Get the URL for the included file - this is the fix
|
|
include_url = include_info["url"]
|
|
|
|
# Create the directory structure for the included file
|
|
include_file_path = os.path.join(temp_dir, include_path)
|
|
os.makedirs(os.path.dirname(include_file_path), exist_ok=True)
|
|
|
|
# Download the included file
|
|
include_response = requests.get(include_url)
|
|
if include_response.status_code == 200:
|
|
with open(include_file_path, "wb") as f:
|
|
f.write(include_response.content)
|
|
else:
|
|
print(f"Failed to download included file: {include_path}")
|
|
|
|
# Import the model into Blender
|
|
if file_format == "gltf" or file_format == "glb":
|
|
bpy.ops.import_scene.gltf(filepath=main_file_path)
|
|
elif file_format == "fbx":
|
|
bpy.ops.import_scene.fbx(filepath=main_file_path)
|
|
elif file_format == "obj":
|
|
bpy.ops.import_scene.obj(filepath=main_file_path)
|
|
elif file_format == "blend":
|
|
# For blend files, we need to append or link
|
|
with bpy.data.libraries.load(main_file_path, link=False) as (data_from, data_to):
|
|
data_to.objects = data_from.objects
|
|
|
|
# Link the objects to the scene
|
|
for obj in data_to.objects:
|
|
if obj is not None:
|
|
bpy.context.collection.objects.link(obj)
|
|
else:
|
|
return {"error": f"Unsupported model format: {file_format}"}
|
|
|
|
# Get the names of imported objects
|
|
imported_objects = [obj.name for obj in bpy.context.selected_objects]
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Model {asset_id} imported successfully",
|
|
"imported_objects": imported_objects
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"Failed to import model: {str(e)}"}
|
|
finally:
|
|
# Clean up temporary directory
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
except:
|
|
print(f"Failed to clean up temporary directory: {temp_dir}")
|
|
else:
|
|
return {"error": f"Requested format or resolution not available for this model"}
|
|
|
|
else:
|
|
return {"error": f"Unsupported asset type: {asset_type}"}
|
|
|
|
except Exception as e:
|
|
return {"error": f"Failed to download asset: {str(e)}"}
|
|
|
|
# Blender UI Panel
|
|
class BLENDERMCP_PT_Panel(bpy.types.Panel):
|
|
bl_label = "Blender MCP"
|
|
bl_idname = "BLENDERMCP_PT_Panel"
|
|
bl_space_type = 'VIEW_3D'
|
|
bl_region_type = 'UI'
|
|
bl_category = 'BlenderMCP'
|
|
|
|
def draw(self, context):
|
|
layout = self.layout
|
|
scene = context.scene
|
|
|
|
layout.prop(scene, "blendermcp_port")
|
|
|
|
if not scene.blendermcp_server_running:
|
|
layout.operator("blendermcp.start_server", text="Start MCP Server")
|
|
else:
|
|
layout.operator("blendermcp.stop_server", text="Stop MCP Server")
|
|
layout.label(text=f"Running on port {scene.blendermcp_port}")
|
|
|
|
# Operator to start the server
|
|
class BLENDERMCP_OT_StartServer(bpy.types.Operator):
|
|
bl_idname = "blendermcp.start_server"
|
|
bl_label = "Start BlenderMCP Server"
|
|
bl_description = "Start the BlenderMCP server to connect with Claude"
|
|
|
|
def execute(self, context):
|
|
scene = context.scene
|
|
|
|
# Create a new server instance
|
|
if not hasattr(bpy.types, "blendermcp_server") or not bpy.types.blendermcp_server:
|
|
bpy.types.blendermcp_server = BlenderMCPServer(port=scene.blendermcp_port)
|
|
|
|
# Start the server
|
|
bpy.types.blendermcp_server.start()
|
|
scene.blendermcp_server_running = True
|
|
|
|
return {'FINISHED'}
|
|
|
|
# Operator to stop the server
|
|
class BLENDERMCP_OT_StopServer(bpy.types.Operator):
|
|
bl_idname = "blendermcp.stop_server"
|
|
bl_label = "Stop BlenderMCP Server"
|
|
bl_description = "Stop the BlenderMCP server"
|
|
|
|
def execute(self, context):
|
|
scene = context.scene
|
|
|
|
# Stop the server if it exists
|
|
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
|
|
bpy.types.blendermcp_server.stop()
|
|
del bpy.types.blendermcp_server
|
|
|
|
scene.blendermcp_server_running = False
|
|
|
|
return {'FINISHED'}
|
|
|
|
# Registration functions
|
|
def register():
|
|
bpy.types.Scene.blendermcp_port = IntProperty(
|
|
name="Port",
|
|
description="Port for the BlenderMCP server",
|
|
default=9876,
|
|
min=1024,
|
|
max=65535
|
|
)
|
|
|
|
bpy.types.Scene.blendermcp_server_running = bpy.props.BoolProperty(
|
|
name="Server Running",
|
|
default=False
|
|
)
|
|
|
|
bpy.utils.register_class(BLENDERMCP_PT_Panel)
|
|
bpy.utils.register_class(BLENDERMCP_OT_StartServer)
|
|
bpy.utils.register_class(BLENDERMCP_OT_StopServer)
|
|
|
|
print("BlenderMCP addon registered")
|
|
|
|
def unregister():
|
|
# Stop the server if it's running
|
|
if hasattr(bpy.types, "blendermcp_server") and bpy.types.blendermcp_server:
|
|
bpy.types.blendermcp_server.stop()
|
|
del bpy.types.blendermcp_server
|
|
|
|
bpy.utils.unregister_class(BLENDERMCP_PT_Panel)
|
|
bpy.utils.unregister_class(BLENDERMCP_OT_StartServer)
|
|
bpy.utils.unregister_class(BLENDERMCP_OT_StopServer)
|
|
|
|
del bpy.types.Scene.blendermcp_port
|
|
del bpy.types.Scene.blendermcp_server_running
|
|
|
|
print("BlenderMCP addon unregistered")
|
|
|
|
if __name__ == "__main__":
|
|
register() |