Memory integrity and watermarking toolkit for AI agent long-term memory systems.
MemMark detects memory poisoning, verifies provenance, generates integrity manifests, and embeds cryptographic watermarks in AI agent memory systems — ensuring the memories your agent trusts are actually legitimate.
Documentation: carlos-projects.github.io/memmark
| Feature | Description |
|---|---|
| 🏷️ Memory Watermarking | HMAC-SHA256 + PBKDF2 watermarks with entropy salt |
| 🛡️ Poisoning Detection | Configurable pattern-based injection & manipulation detection |
| 🔍 Provenance Tracking | SHA-256 chain hashing with cycle-safe graph analysis |
| 📋 Integrity Manifests | Generate & verify SHA-256 manifests per entry & state |
| 📊 Memory Diff | Compare memory states (added, removed, modified entries) |
| 🔬 Memory Forensics | Temporal, content & source anomaly scoring |
| 📝 Policy Generation | MCPGuard-compatible YAML policies from scan results |
| 🔄 Pluggable Store | FileMemoryStore, InMemoryMemoryStore, custom backends |
| 🧩 Composable Pipeline | ScanPipeline + ScanStage for custom analysis workflows |
| 📋 Structured Logging | JSON logging with correlation IDs for pipeline tracing |
pip install memmark-agent
memmark scan memory.json -k my-secret-key
# Inject watermarks
memmark watermark memory.json --action inject --key my-key -o watermarked.json
# Detect watermarks
memmark watermark watermarked.json --action detect --key my-key
# Integrity manifest
memmark manifest memory.json -o manifest.json
# Verify against manifest
memmark verify memory.json --manifest manifest.json
# Generate MCPGuard policy
memmark generate-policy memory.json -o policy.yaml
from memmark import run_full_scan
memories = [{"id": "mem-001", "content": "User likes dark mode"}]
result = run_full_scan(memories, watermark_key="my-secret")
for f in result.findings:
print(f" [{f.severity}] {f.description}")
from memmark import ScanPipeline
pipeline = ScanPipeline.with_default_stages(watermark_key="my-secret")
result = pipeline.run(memories, scan_id="custom-scan")
# Async variant
result = await pipeline.arun(memories)
from memmark import ScanStage, PipelineContext
class CustomStage(ScanStage):
def run(self, ctx: PipelineContext) -> None:
# Access ctx.memories, ctx.findings, ctx.metadata
...
pipeline = ScanPipeline.with_default_stages(watermark_key="k")
pipeline.add_stage(CustomStage())
from memmark import FileMemoryStore, InMemoryMemoryStore, MemoryScanner
store = FileMemoryStore("memories.json")
memories = store.read()
scanner = MemoryScanner()
memories = scanner.load_memory(store) # auto-detects MemoryStore
CLI (typer)
└─ ScanPipeline (composable stages)
├─ PoisoningStage — configurable pattern injection/manipulation detection
├─ WatermarkStage — HMAC-SHA256 + PBKDF2 verification
└─ ForensicsStage — temporal/content/source anomaly scoring
└─ WatermarkInjector / WatermarkDetector
└─ PoisoningDetector / PoisoningClassifier / PoisoningRemediation
└─ ProvenanceTracker / ProvenanceVerifier / ProvenanceGraph
└─ IntegrityManifest / MemoryDiff / MemoryForensics
└─ MCPGuardPolicy
└─ MemoryStore (FileMemoryStore / InMemoryMemoryStore)
# Install dev + docs dependencies
pip install -e ".[dev,docs]
# Run tests with coverage
make test # or: python -m pytest tests/ -v
# Lint + type check
make lint # ruff check src/ tests/
make typecheck # mypy src/
# Build docs
make serve-docs # mkdocs serve → localhost:8000
# Build package
make build # python -m build
# Run pre-commit hooks
make precommit # pre-commit run --all-files
# Full CI pipeline
make all # install → lint → typecheck → test → coverage
| Project | Integration |
|---|---|
| MCPGuard | MemMark generates memory protection policies |
| MCPscop | MemMark reports consumable by MCPscop dashboard |
| mcp-taxonomy | Standardized finding classification |
MIT — See LICENSE.
Carlos-Projects — GitHub