renamed to mcp-obsidian

This commit is contained in:
Markus Pfundstein
2024-12-04 10:22:24 +01:00
parent 616309c2cb
commit a27363c643
7 changed files with 15 additions and 31 deletions

View File

@@ -0,0 +1,9 @@
from . import server
import asyncio
def main():
"""Main entry point for the package."""
asyncio.run(server.main())
# Optionally expose other important items at package level
__all__ = ['main', 'server']

View File

@@ -0,0 +1,134 @@
import requests
import urllib.parse
from typing import Any
class Obsidian():
def __init__(
self,
api_key: str,
protocol: str = 'https',
host: str = "127.0.0.1",
port: int = 27124,
verify_ssl: bool = False,
):
self.api_key = api_key
self.protocol = protocol
self.host = host
self.port = port
self.verify_ssl = verify_ssl
self.timeout = (3, 6)
def get_base_url(self) -> str:
return f'{self.protocol}://{self.host}:{self.port}'
def _get_headers(self) -> dict:
headers = {
'Authorization': f'Bearer {self.api_key}'
}
return headers
def _safe_call(self, f) -> Any:
try:
return f()
except requests.HTTPError as e:
error_data = e.response.json() if e.response.content else {}
code = error_data.get('errorCode', -1)
message = error_data.get('message', '<unknown>')
raise Exception(f"Error {code}: {message}")
except requests.exceptions.RequestException as e:
raise Exception(f"Request failed: {str(e)}")
def list_files_in_vault(self) -> Any:
url = f"{self.get_base_url()}/vault/"
def call_fn():
response = requests.get(url, headers=self._get_headers(), verify=self.verify_ssl, timeout=self.timeout)
response.raise_for_status()
return response.json()['files']
return self._safe_call(call_fn)
def list_files_in_dir(self, dirpath: str) -> Any:
url = f"{self.get_base_url()}/vault/{dirpath}/"
def call_fn():
response = requests.get(url, headers=self._get_headers(), verify=self.verify_ssl, timeout=self.timeout)
response.raise_for_status()
return response.json()['files']
return self._safe_call(call_fn)
def get_file_contents(self, filepath: str) -> Any:
url = f"{self.get_base_url()}/vault/{filepath}"
def call_fn():
response = requests.get(url, headers=self._get_headers(), verify=self.verify_ssl, timeout=self.timeout)
response.raise_for_status()
return response.text
return self._safe_call(call_fn)
def search(self, query: str, context_length: int = 100) -> Any:
url = f"{self.get_base_url()}/search/simple/"
params = {
'query': query,
'contextLength': context_length
}
def call_fn():
response = requests.post(url, headers=self._get_headers(), params=params, verify=self.verify_ssl, timeout=self.timeout)
response.raise_for_status()
return response.json()
return self._safe_call(call_fn)
def append_content(self, filepath: str, content: str) -> Any:
url = f"{self.get_base_url()}/vault/{filepath}"
def call_fn():
response = requests.post(
url,
headers=self._get_headers() | {'Content-Type': 'text/markdown'},
data=content,
verify=self.verify_ssl,
timeout=self.timeout
)
response.raise_for_status()
return None
return self._safe_call(call_fn)
def patch_content(self, filepath: str, operation: str, target_type: str, target: str, content: str) -> Any:
url = f"{self.get_base_url()}/vault/{filepath}"
headers = self._get_headers() | {
'Content-Type': 'text/markdown',
'Operation': operation,
'Target-Type': target_type,
'Target': urllib.parse.quote(target)
}
def call_fn():
response = requests.patch(url, headers=headers, data=content, verify=self.verify_ssl, timeout=self.timeout)
response.raise_for_status()
return None
return self._safe_call(call_fn)
def search_json(self, query: dict) -> Any:
url = f"{self.get_base_url()}/search/"
headers = self._get_headers() | {
'Content-Type': 'application/vnd.olrapi.jsonlogic+json'
}
def call_fn():
response = requests.post(url, headers=headers, json=query, verify=self.verify_ssl, timeout=self.timeout)
response.raise_for_status()
return response.json()
return self._safe_call(call_fn)

View File

@@ -0,0 +1,91 @@
import json
import logging
from collections.abc import Sequence
from functools import lru_cache
from typing import Any
import os
from dotenv import load_dotenv
from mcp.server import Server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
load_dotenv()
from . import tools
# Load environment variables
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-obsidian")
api_key = os.getenv("OBSIDIAN_API_KEY")
if not api_key:
raise ValueError("OBSIDIAN_API_KEY environment variable required")
app = Server("mcp-knowledge-base")
tool_handlers = {}
def add_tool_handler(tool_class: tools.ToolHandler):
global tool_handlers
tool_handlers[tool_class.name] = tool_class
def get_tool_handler(name: str) -> tools.ToolHandler | None:
if name not in tool_handlers:
return None
return tool_handlers[name]
add_tool_handler(tools.ListFilesInDirToolHandler())
add_tool_handler(tools.ListFilesInVaultToolHandler())
add_tool_handler(tools.GetFileContentsToolHandler())
add_tool_handler(tools.SearchToolHandler())
add_tool_handler(tools.PatchContentToolHandler())
add_tool_handler(tools.AppendContentToolHandler())
add_tool_handler(tools.ComplexSearchToolHandler())
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [th.get_tool_description() for th in tool_handlers.values()]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls for command line run."""
if not isinstance(arguments, dict):
raise RuntimeError("arguments must be dictionary")
tool_handler = get_tool_handler(name)
if not tool_handler:
raise ValueError(f"Unknown tool: {name}")
try:
return tool_handler.run_tool(arguments)
except Exception as e:
logger.error(str(e))
raise RuntimeError(f"Caught Exception. Error: {str(e)}")
async def main():
# Import here to avoid issues with event loops
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)

326
src/mcp_obsidian/tools.py Normal file
View File

@@ -0,0 +1,326 @@
from collections.abc import Sequence
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
import json
import os
from . import obsidian
api_key = os.getenv("OBSIDIAN_API_KEY", "")
if api_key == "":
raise ValueError("OBSIDIAN_API_KEY environment variable required")
TOOL_LIST_FILES_IN_VAULT = "list_files_in_vault"
TOOL_LIST_FILES_IN_DIR = "list_files_in_dir"
class ToolHandler():
def __init__(self, tool_name: str):
self.name = tool_name
def get_tool_description(self) -> Tool:
raise NotImplementedError()
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
raise NotImplementedError()
class ListFilesInVaultToolHandler(ToolHandler):
def __init__(self):
super().__init__(TOOL_LIST_FILES_IN_VAULT)
def get_tool_description(self):
return Tool(
name=self.name,
description="Lists all files and directories in the root directory of your Obsidian vault.",
inputSchema={
"type": "object",
"properties": {},
"required": []
},
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
api = obsidian.Obsidian(api_key=api_key)
files = api.list_files_in_vault()
return [
TextContent(
type="text",
text=json.dumps(files, indent=2)
)
]
class ListFilesInDirToolHandler(ToolHandler):
def __init__(self):
super().__init__(TOOL_LIST_FILES_IN_DIR)
def get_tool_description(self):
return Tool(
name=self.name,
description="Lists all files and directories that exist in a specific Obsidian directory.",
inputSchema={
"type": "object",
"properties": {
"dirpath": {
"type": "string",
"description": "Path to list files from (relative to your vault root). Note that empty directories will not be returned."
},
},
"required": ["dirpath"]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "dirpath" not in args:
raise RuntimeError("dirpath argument missing in arguments")
api = obsidian.Obsidian(api_key=api_key)
files = api.list_files_in_dir(args["dirpath"])
return [
TextContent(
type="text",
text=json.dumps(files, indent=2)
)
]
class GetFileContentsToolHandler(ToolHandler):
def __init__(self):
super().__init__("get_file_contents")
def get_tool_description(self):
return Tool(
name=self.name,
description="Return the content of a single file in your vault.",
inputSchema={
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the relevant file (relative to your vault root).",
"format": "path"
},
},
"required": ["filepath"]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "filepath" not in args:
raise RuntimeError("filepath argument missing in arguments")
api = obsidian.Obsidian(api_key=api_key)
content = api.get_file_contents(args["filepath"])
return [
TextContent(
type="text",
text=json.dumps(content, indent=2)
)
]
class SearchToolHandler(ToolHandler):
def __init__(self):
super().__init__("simple_search")
def get_tool_description(self):
return Tool(
name=self.name,
description="""Simple search for documents matching a specified text query across all files in the vault.
Use this tool when you want to do a simple text search""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Text to a simple search for in the vault."
},
"context_length": {
"type": "integer",
"description": "How much context to return around the matching string (default: 100)",
"default": 100
}
},
"required": ["query"]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "query" not in args:
raise RuntimeError("query argument missing in arguments")
context_length = args.get("context_length", 100)
api = obsidian.Obsidian(api_key=api_key)
results = api.search(args["query"], context_length)
formatted_results = []
for result in results:
formatted_matches = []
for match in result.get('matches', []):
context = match.get('context', '')
match_pos = match.get('match', {})
start = match_pos.get('start', 0)
end = match_pos.get('end', 0)
formatted_matches.append({
'context': context,
'match_position': {'start': start, 'end': end}
})
formatted_results.append({
'filename': result.get('filename', ''),
'score': result.get('score', 0),
'matches': formatted_matches
})
return [
TextContent(
type="text",
text=json.dumps(formatted_results, indent=2)
)
]
class AppendContentToolHandler(ToolHandler):
def __init__(self):
super().__init__("append_content")
def get_tool_description(self):
return Tool(
name=self.name,
description="Append content to a new or existing file in the vault.",
inputSchema={
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the file (relative to vault root)",
"format": "path"
},
"content": {
"type": "string",
"description": "Content to append to the file"
}
},
"required": ["filepath", "content"]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "filepath" not in args or "content" not in args:
raise RuntimeError("filepath and content arguments required")
api = obsidian.Obsidian(api_key=api_key)
api.append_content(args.get("filepath", ""), args["content"])
return [
TextContent(
type="text",
text=f"Successfully appended content to {args['filepath']}"
)
]
class PatchContentToolHandler(ToolHandler):
def __init__(self):
super().__init__("patch_content")
def get_tool_description(self):
return Tool(
name=self.name,
description="Insert content into an existing note relative to a heading, block reference, or frontmatter field.",
inputSchema={
"type": "object",
"properties": {
"filepath": {
"type": "string",
"description": "Path to the file (relative to vault root)",
"format": "path"
},
"operation": {
"type": "string",
"description": "Operation to perform (append, prepend, or replace)",
"enum": ["append", "prepend", "replace"]
},
"target_type": {
"type": "string",
"description": "Type of target to patch",
"enum": ["heading", "block", "frontmatter"]
},
"target": {
"type": "string",
"description": "Target identifier (heading path, block reference, or frontmatter field)"
},
"content": {
"type": "string",
"description": "Content to insert"
}
},
"required": ["filepath", "operation", "target_type", "target", "content"]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
required = ["filepath", "operation", "target_type", "target", "content"]
if not all(key in args for key in required):
raise RuntimeError(f"Missing required arguments: {', '.join(required)}")
api = obsidian.Obsidian(api_key=api_key)
api.patch_content(
args.get("filepath", ""),
args.get("operation", ""),
args.get("target_type", ""),
args.get("target", ""),
args.get("content", "")
)
return [
TextContent(
type="text",
text=f"Successfully patched content in {args['filepath']}"
)
]
class ComplexSearchToolHandler(ToolHandler):
def __init__(self):
super().__init__("complex_search")
def get_tool_description(self):
return Tool(
name=self.name,
description="""Complex search for documents using a JsonLogic query.
Supports standard JsonLogic operators plus 'glob' and 'regexp' for pattern matching. Results must be non-falsy.
Use this tool when you want to do a complex search, e.g. for all documents with certain tags etc.
""",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "object",
"description": "JsonLogic query object. Example: {\"glob\": [\"*.md\", {\"var\": \"path\"}]} matches all markdown files"
}
},
"required": ["query"]
}
)
def run_tool(self, args: dict) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
if "query" not in args:
raise RuntimeError("query argument missing in arguments")
api = obsidian.Obsidian(api_key=api_key)
results = api.search_json(args.get("query", ""))
return [
TextContent(
type="text",
text=json.dumps(results, indent=2)
)
]