renamed to mcp-obsidian
This commit is contained in:
9
src/mcp_obsidian/__init__.py
Normal file
9
src/mcp_obsidian/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from . import server
|
||||
import asyncio
|
||||
|
||||
def main():
|
||||
"""Main entry point for the package."""
|
||||
asyncio.run(server.main())
|
||||
|
||||
# Optionally expose other important items at package level
|
||||
__all__ = ['main', 'server']
|
134
src/mcp_obsidian/obsidian.py
Normal file
134
src/mcp_obsidian/obsidian.py
Normal file
@@ -0,0 +1,134 @@
|
||||
import requests
|
||||
import urllib.parse
|
||||
from typing import Any
|
||||
|
||||
class Obsidian():
|
||||
def __init__(
|
||||
self,
|
||||
api_key: str,
|
||||
protocol: str = 'https',
|
||||
host: str = "127.0.0.1",
|
||||
port: int = 27124,
|
||||
verify_ssl: bool = False,
|
||||
):
|
||||
self.api_key = api_key
|
||||
self.protocol = protocol
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.verify_ssl = verify_ssl
|
||||
self.timeout = (3, 6)
|
||||
|
||||
def get_base_url(self) -> str:
|
||||
return f'{self.protocol}://{self.host}:{self.port}'
|
||||
|
||||
def _get_headers(self) -> dict:
|
||||
headers = {
|
||||
'Authorization': f'Bearer {self.api_key}'
|
||||
}
|
||||
return headers
|
||||
|
||||
def _safe_call(self, f) -> Any:
|
||||
try:
|
||||
return f()
|
||||
except requests.HTTPError as e:
|
||||
error_data = e.response.json() if e.response.content else {}
|
||||
code = error_data.get('errorCode', -1)
|
||||
message = error_data.get('message', '<unknown>')
|
||||
raise Exception(f"Error {code}: {message}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Request failed: {str(e)}")
|
||||
|
||||
def list_files_in_vault(self) -> Any:
|
||||
url = f"{self.get_base_url()}/vault/"
|
||||
|
||||
def call_fn():
|
||||
response = requests.get(url, headers=self._get_headers(), verify=self.verify_ssl, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
|
||||
return response.json()['files']
|
||||
|
||||
return self._safe_call(call_fn)
|
||||
|
||||
|
||||
def list_files_in_dir(self, dirpath: str) -> Any:
|
||||
url = f"{self.get_base_url()}/vault/{dirpath}/"
|
||||
|
||||
def call_fn():
|
||||
response = requests.get(url, headers=self._get_headers(), verify=self.verify_ssl, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
|
||||
return response.json()['files']
|
||||
|
||||
return self._safe_call(call_fn)
|
||||
|
||||
def get_file_contents(self, filepath: str) -> Any:
|
||||
url = f"{self.get_base_url()}/vault/{filepath}"
|
||||
|
||||
def call_fn():
|
||||
response = requests.get(url, headers=self._get_headers(), verify=self.verify_ssl, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
|
||||
return response.text
|
||||
|
||||
return self._safe_call(call_fn)
|
||||
|
||||
def search(self, query: str, context_length: int = 100) -> Any:
|
||||
url = f"{self.get_base_url()}/search/simple/"
|
||||
params = {
|
||||
'query': query,
|
||||
'contextLength': context_length
|
||||
}
|
||||
|
||||
def call_fn():
|
||||
response = requests.post(url, headers=self._get_headers(), params=params, verify=self.verify_ssl, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
return self._safe_call(call_fn)
|
||||
|
||||
def append_content(self, filepath: str, content: str) -> Any:
|
||||
url = f"{self.get_base_url()}/vault/{filepath}"
|
||||
|
||||
def call_fn():
|
||||
response = requests.post(
|
||||
url,
|
||||
headers=self._get_headers() | {'Content-Type': 'text/markdown'},
|
||||
data=content,
|
||||
verify=self.verify_ssl,
|
||||
timeout=self.timeout
|
||||
)
|
||||
response.raise_for_status()
|
||||
return None
|
||||
|
||||
return self._safe_call(call_fn)
|
||||
|
||||
def patch_content(self, filepath: str, operation: str, target_type: str, target: str, content: str) -> Any:
|
||||
url = f"{self.get_base_url()}/vault/{filepath}"
|
||||
|
||||
headers = self._get_headers() | {
|
||||
'Content-Type': 'text/markdown',
|
||||
'Operation': operation,
|
||||
'Target-Type': target_type,
|
||||
'Target': urllib.parse.quote(target)
|
||||
}
|
||||
|
||||
def call_fn():
|
||||
response = requests.patch(url, headers=headers, data=content, verify=self.verify_ssl, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
return None
|
||||
|
||||
return self._safe_call(call_fn)
|
||||
|
||||
def search_json(self, query: dict) -> Any:
|
||||
url = f"{self.get_base_url()}/search/"
|
||||
|
||||
headers = self._get_headers() | {
|
||||
'Content-Type': 'application/vnd.olrapi.jsonlogic+json'
|
||||
}
|
||||
|
||||
def call_fn():
|
||||
response = requests.post(url, headers=headers, json=query, verify=self.verify_ssl, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
return self._safe_call(call_fn)
|
91
src/mcp_obsidian/server.py
Normal file
91
src/mcp_obsidian/server.py
Normal file
@@ -0,0 +1,91 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from functools import lru_cache
|
||||
from typing import Any
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
from mcp.server import Server
|
||||
from mcp.types import (
|
||||
Tool,
|
||||
TextContent,
|
||||
ImageContent,
|
||||
EmbeddedResource,
|
||||
)
|
||||
|
||||
load_dotenv()
|
||||
|
||||
from . import tools
|
||||
|
||||
# Load environment variables
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger("mcp-obsidian")
|
||||
|
||||
api_key = os.getenv("OBSIDIAN_API_KEY")
|
||||
if not api_key:
|
||||
raise ValueError("OBSIDIAN_API_KEY environment variable required")
|
||||
|
||||
app = Server("mcp-knowledge-base")
|
||||
|
||||
tool_handlers = {}
|
||||
def add_tool_handler(tool_class: tools.ToolHandler):
|
||||
global tool_handlers
|
||||
|
||||
tool_handlers[tool_class.name] = tool_class
|
||||
|
||||
def get_tool_handler(name: str) -> tools.ToolHandler | None:
|
||||
if name not in tool_handlers:
|
||||
return None
|
||||
|
||||
return tool_handlers[name]
|
||||
|
||||
add_tool_handler(tools.ListFilesInDirToolHandler())
|
||||
add_tool_handler(tools.ListFilesInVaultToolHandler())
|
||||
add_tool_handler(tools.GetFileContentsToolHandler())
|
||||
add_tool_handler(tools.SearchToolHandler())
|
||||
add_tool_handler(tools.PatchContentToolHandler())
|
||||
add_tool_handler(tools.AppendContentToolHandler())
|
||||
add_tool_handler(tools.ComplexSearchToolHandler())
|
||||
|
||||
@app.list_tools()
|
||||
async def list_tools() -> list[Tool]:
|
||||
"""List available tools."""
|
||||
|
||||
return [th.get_tool_description() for th in tool_handlers.values()]
|
||||
|
||||
@app.call_tool()
|
||||
async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
"""Handle tool calls for command line run."""
|
||||
|
||||
if not isinstance(arguments, dict):
|
||||
raise RuntimeError("arguments must be dictionary")
|
||||
|
||||
|
||||
tool_handler = get_tool_handler(name)
|
||||
if not tool_handler:
|
||||
raise ValueError(f"Unknown tool: {name}")
|
||||
|
||||
try:
|
||||
return tool_handler.run_tool(arguments)
|
||||
except Exception as e:
|
||||
logger.error(str(e))
|
||||
raise RuntimeError(f"Caught Exception. Error: {str(e)}")
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
# Import here to avoid issues with event loops
|
||||
from mcp.server.stdio import stdio_server
|
||||
|
||||
async with stdio_server() as (read_stream, write_stream):
|
||||
await app.run(
|
||||
read_stream,
|
||||
write_stream,
|
||||
app.create_initialization_options()
|
||||
)
|
||||
|
||||
|
||||
|
326
src/mcp_obsidian/tools.py
Normal file
326
src/mcp_obsidian/tools.py
Normal file
@@ -0,0 +1,326 @@
|
||||
from collections.abc import Sequence
|
||||
from mcp.types import (
|
||||
Tool,
|
||||
TextContent,
|
||||
ImageContent,
|
||||
EmbeddedResource,
|
||||
)
|
||||
import json
|
||||
import os
|
||||
from . import obsidian
|
||||
|
||||
api_key = os.getenv("OBSIDIAN_API_KEY", "")
|
||||
if api_key == "":
|
||||
raise ValueError("OBSIDIAN_API_KEY environment variable required")
|
||||
|
||||
TOOL_LIST_FILES_IN_VAULT = "list_files_in_vault"
|
||||
TOOL_LIST_FILES_IN_DIR = "list_files_in_dir"
|
||||
|
||||
class ToolHandler():
|
||||
def __init__(self, tool_name: str):
|
||||
self.name = tool_name
|
||||
|
||||
def get_tool_description(self) -> Tool:
|
||||
raise NotImplementedError()
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
raise NotImplementedError()
|
||||
|
||||
class ListFilesInVaultToolHandler(ToolHandler):
|
||||
def __init__(self):
|
||||
super().__init__(TOOL_LIST_FILES_IN_VAULT)
|
||||
|
||||
def get_tool_description(self):
|
||||
return Tool(
|
||||
name=self.name,
|
||||
description="Lists all files and directories in the root directory of your Obsidian vault.",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": []
|
||||
},
|
||||
)
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
|
||||
api = obsidian.Obsidian(api_key=api_key)
|
||||
|
||||
files = api.list_files_in_vault()
|
||||
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=json.dumps(files, indent=2)
|
||||
)
|
||||
]
|
||||
|
||||
class ListFilesInDirToolHandler(ToolHandler):
|
||||
def __init__(self):
|
||||
super().__init__(TOOL_LIST_FILES_IN_DIR)
|
||||
|
||||
def get_tool_description(self):
|
||||
return Tool(
|
||||
name=self.name,
|
||||
description="Lists all files and directories that exist in a specific Obsidian directory.",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"dirpath": {
|
||||
"type": "string",
|
||||
"description": "Path to list files from (relative to your vault root). Note that empty directories will not be returned."
|
||||
},
|
||||
},
|
||||
"required": ["dirpath"]
|
||||
}
|
||||
)
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
|
||||
if "dirpath" not in args:
|
||||
raise RuntimeError("dirpath argument missing in arguments")
|
||||
|
||||
api = obsidian.Obsidian(api_key=api_key)
|
||||
|
||||
files = api.list_files_in_dir(args["dirpath"])
|
||||
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=json.dumps(files, indent=2)
|
||||
)
|
||||
]
|
||||
|
||||
class GetFileContentsToolHandler(ToolHandler):
|
||||
def __init__(self):
|
||||
super().__init__("get_file_contents")
|
||||
|
||||
def get_tool_description(self):
|
||||
return Tool(
|
||||
name=self.name,
|
||||
description="Return the content of a single file in your vault.",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"filepath": {
|
||||
"type": "string",
|
||||
"description": "Path to the relevant file (relative to your vault root).",
|
||||
"format": "path"
|
||||
},
|
||||
},
|
||||
"required": ["filepath"]
|
||||
}
|
||||
)
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
if "filepath" not in args:
|
||||
raise RuntimeError("filepath argument missing in arguments")
|
||||
|
||||
api = obsidian.Obsidian(api_key=api_key)
|
||||
|
||||
content = api.get_file_contents(args["filepath"])
|
||||
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=json.dumps(content, indent=2)
|
||||
)
|
||||
]
|
||||
|
||||
class SearchToolHandler(ToolHandler):
|
||||
def __init__(self):
|
||||
super().__init__("simple_search")
|
||||
|
||||
def get_tool_description(self):
|
||||
return Tool(
|
||||
name=self.name,
|
||||
description="""Simple search for documents matching a specified text query across all files in the vault.
|
||||
Use this tool when you want to do a simple text search""",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "Text to a simple search for in the vault."
|
||||
},
|
||||
"context_length": {
|
||||
"type": "integer",
|
||||
"description": "How much context to return around the matching string (default: 100)",
|
||||
"default": 100
|
||||
}
|
||||
},
|
||||
"required": ["query"]
|
||||
}
|
||||
)
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
if "query" not in args:
|
||||
raise RuntimeError("query argument missing in arguments")
|
||||
|
||||
context_length = args.get("context_length", 100)
|
||||
|
||||
api = obsidian.Obsidian(api_key=api_key)
|
||||
results = api.search(args["query"], context_length)
|
||||
|
||||
formatted_results = []
|
||||
for result in results:
|
||||
formatted_matches = []
|
||||
for match in result.get('matches', []):
|
||||
context = match.get('context', '')
|
||||
match_pos = match.get('match', {})
|
||||
start = match_pos.get('start', 0)
|
||||
end = match_pos.get('end', 0)
|
||||
|
||||
formatted_matches.append({
|
||||
'context': context,
|
||||
'match_position': {'start': start, 'end': end}
|
||||
})
|
||||
|
||||
formatted_results.append({
|
||||
'filename': result.get('filename', ''),
|
||||
'score': result.get('score', 0),
|
||||
'matches': formatted_matches
|
||||
})
|
||||
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=json.dumps(formatted_results, indent=2)
|
||||
)
|
||||
]
|
||||
|
||||
class AppendContentToolHandler(ToolHandler):
|
||||
def __init__(self):
|
||||
super().__init__("append_content")
|
||||
|
||||
def get_tool_description(self):
|
||||
return Tool(
|
||||
name=self.name,
|
||||
description="Append content to a new or existing file in the vault.",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"filepath": {
|
||||
"type": "string",
|
||||
"description": "Path to the file (relative to vault root)",
|
||||
"format": "path"
|
||||
},
|
||||
"content": {
|
||||
"type": "string",
|
||||
"description": "Content to append to the file"
|
||||
}
|
||||
},
|
||||
"required": ["filepath", "content"]
|
||||
}
|
||||
)
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
if "filepath" not in args or "content" not in args:
|
||||
raise RuntimeError("filepath and content arguments required")
|
||||
|
||||
api = obsidian.Obsidian(api_key=api_key)
|
||||
api.append_content(args.get("filepath", ""), args["content"])
|
||||
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=f"Successfully appended content to {args['filepath']}"
|
||||
)
|
||||
]
|
||||
|
||||
class PatchContentToolHandler(ToolHandler):
|
||||
def __init__(self):
|
||||
super().__init__("patch_content")
|
||||
|
||||
def get_tool_description(self):
|
||||
return Tool(
|
||||
name=self.name,
|
||||
description="Insert content into an existing note relative to a heading, block reference, or frontmatter field.",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"filepath": {
|
||||
"type": "string",
|
||||
"description": "Path to the file (relative to vault root)",
|
||||
"format": "path"
|
||||
},
|
||||
"operation": {
|
||||
"type": "string",
|
||||
"description": "Operation to perform (append, prepend, or replace)",
|
||||
"enum": ["append", "prepend", "replace"]
|
||||
},
|
||||
"target_type": {
|
||||
"type": "string",
|
||||
"description": "Type of target to patch",
|
||||
"enum": ["heading", "block", "frontmatter"]
|
||||
},
|
||||
"target": {
|
||||
"type": "string",
|
||||
"description": "Target identifier (heading path, block reference, or frontmatter field)"
|
||||
},
|
||||
"content": {
|
||||
"type": "string",
|
||||
"description": "Content to insert"
|
||||
}
|
||||
},
|
||||
"required": ["filepath", "operation", "target_type", "target", "content"]
|
||||
}
|
||||
)
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
required = ["filepath", "operation", "target_type", "target", "content"]
|
||||
if not all(key in args for key in required):
|
||||
raise RuntimeError(f"Missing required arguments: {', '.join(required)}")
|
||||
|
||||
api = obsidian.Obsidian(api_key=api_key)
|
||||
api.patch_content(
|
||||
args.get("filepath", ""),
|
||||
args.get("operation", ""),
|
||||
args.get("target_type", ""),
|
||||
args.get("target", ""),
|
||||
args.get("content", "")
|
||||
)
|
||||
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=f"Successfully patched content in {args['filepath']}"
|
||||
)
|
||||
]
|
||||
|
||||
class ComplexSearchToolHandler(ToolHandler):
|
||||
def __init__(self):
|
||||
super().__init__("complex_search")
|
||||
|
||||
def get_tool_description(self):
|
||||
return Tool(
|
||||
name=self.name,
|
||||
description="""Complex search for documents using a JsonLogic query.
|
||||
Supports standard JsonLogic operators plus 'glob' and 'regexp' for pattern matching. Results must be non-falsy.
|
||||
|
||||
Use this tool when you want to do a complex search, e.g. for all documents with certain tags etc.
|
||||
""",
|
||||
inputSchema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "object",
|
||||
"description": "JsonLogic query object. Example: {\"glob\": [\"*.md\", {\"var\": \"path\"}]} matches all markdown files"
|
||||
}
|
||||
},
|
||||
"required": ["query"]
|
||||
}
|
||||
)
|
||||
|
||||
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
|
||||
if "query" not in args:
|
||||
raise RuntimeError("query argument missing in arguments")
|
||||
|
||||
api = obsidian.Obsidian(api_key=api_key)
|
||||
results = api.search_json(args.get("query", ""))
|
||||
|
||||
return [
|
||||
TextContent(
|
||||
type="text",
|
||||
text=json.dumps(results, indent=2)
|
||||
)
|
||||
]
|
Reference in New Issue
Block a user