Skip to content

Python

The xmemory-ai package gives your Python code persistent, structured memory. Write free-form text, have it automatically extracted into typed objects, and query it back in natural language.

For MCP-based integration (no SDK needed), see the MCP guide.

API key: To use xmemory APIs or integrations, you need an API key. Please register your interest at https://xmemory.ai and we will reach out to give access. Copy and securely store the key. Never share your API key publicly.


Terminal window
pip install xmemory-ai

Requires Python 3.10+ and pydantic>=2.0.


This is the full flow — from schema to stored knowledge to answers — in a single script:

import yaml
from xmemory import XmemoryClient, SchemaType
# Connect (reads XMEM_API_KEY from env if api_key is not passed)
client = XmemoryClient(api_key="your-api-key")
# List available clusters
clusters = client.admin.list_clusters()
cluster_id = clusters[0].id
# Describe what you want to remember
schema = client.admin.generate_schema(
cluster_id,
"Track contacts with name, email, company, and notes.",
)
# Create a memory instance from that schema
inst = client.admin.create_instance(
cluster_id=cluster_id,
name="contacts",
schema_text=yaml.dump(schema.data_schema),
schema_type=SchemaType.YML,
)
# inst is an InstanceAPI handle — all subsequent calls go through it.
# Write some information
inst.write("Alice Johnson works at Acme Corp. Her email is alice@acme.com.")
inst.write("Bob Lee is a designer at Globex. He joined last Monday.")
# Read it back
result = inst.read("What is Alice's email?")
print(result.reader_result) # {"answer": "alice@acme.com"}
result = inst.read("Who joined recently?")
print(result.reader_result) # {"answer": "Bob Lee joined last Monday."}
client.close()

Once you have an instance ID, skip the schema step on subsequent runs:

client = XmemoryClient(api_key="your-api-key")
inst = client.instance("your-saved-instance-id")

ParameterEnv varDefaultDescription
api_keyXMEM_API_KEYNoneAPI key for authentication
urlXMEM_API_URLhttps://api.xmemory.aiAPI base URL
timeout60Default request timeout in seconds
http_clientNoneExternal httpx.Client (you manage its lifecycle)

All parameters are keyword-only. The API key and URL fall back to their environment variables when not passed.

with XmemoryClient(api_key="your-api-key") as client:
inst = client.instance("your-instance-id")
result = inst.read("What is Alice's email?")

The async client mirrors the sync API. Use it in asyncio code:

from xmemory import AsyncXmemoryClient
async def main():
async with AsyncXmemoryClient(api_key="your-api-key") as client:
inst = client.instance("your-instance-id")
result = await inst.read("What is Alice's email?")
print(result.reader_result)
import asyncio
asyncio.run(main())

Send free-form text — xmemory extracts structured objects according to your schema and merges them into the knowledge graph.

inst = client.instance("your-instance-id")
resp = inst.write("Carol is a senior engineer at Initech. Her email is carol@initech.com.")
print(resp.cleaned_objects) # the objects that were stored
print(resp.trace_id) # request trace id when available

WriteResult, ReadResult, and ExtractResult include trace_id, which is useful when you want to correlate SDK calls with API logs.

Control the speed/accuracy tradeoff:

from xmemory import ExtractionLogic
resp = inst.write("...", extraction_logic=ExtractionLogic.FAST)
ValueWhen to use
DEEPImportant or complex information (default)
FASTHigh-volume, low-stakes writes

Ask questions in natural language. xmemory translates them into SQL against the knowledge graph and returns a formatted answer.

resp = inst.read("Who works at Acme Corp?")
print(resp.reader_result)
print(resp.trace_id)
from xmemory import ReadMode
# Plain-text answer (default)
resp = inst.read("What is Alice's email?", read_mode=ReadMode.SINGLE_ANSWER)
# → {"answer": "alice@acme.com"}
# Structured objects and relations
resp = inst.read("Show all contacts", read_mode=ReadMode.XRESPONSE)
# → {"objects": [...], "relations": [...]}
# Raw SQL result sets
resp = inst.read("List all contacts", read_mode=ReadMode.RAW_TABLES)
# → {"tables": [...]}

For latency-sensitive code, enqueue a write and return immediately:

resp = inst.write_async("Dave manages the London office.")
print(resp.write_id) # use this to poll for completion

Then check the status:

status = inst.write_status(resp.write_id)
print(status.write_status)
# → WriteQueueStatus.QUEUED | PROCESSING | COMPLETED | FAILED | NOT_FOUND

Do not call read immediately after write_async — the data may not be committed yet. Poll with write_status until COMPLETED, or use write (synchronous) when you need to read right after.


Preview what xmemory would extract from a piece of text, without storing anything:

resp = inst.extract("Dave manages the London office.")
print(resp.objects_extracted)
print(resp.trace_id)

Accepts the same extraction_logic parameter as write.


The describe() method returns agent-facing tool descriptions enriched with the instance’s actual schema. Use it to tell an LLM what tools are available and how to call them.

desc = inst.describe()
# Plain text — inject into a system prompt
print(desc.as_text())
# Anthropic tool-use format
tools = desc.as_anthropic_tools()
# OpenAI function-calling format
tools = desc.as_openai_tools()

Results are cached locally for 5 minutes. To force a refresh (e.g. after updating the schema):

inst.clear_describe_cache()
desc = inst.describe()

as_text() shows tools as method signatures by default. Pass include_http=True to also show HTTP method and path for raw REST callers.


clusters = client.admin.list_clusters()
for c in clusters:
print(f"{c.id}: {c.name}")

Describe what you want to track in plain language:

schema = client.admin.generate_schema(
cluster_id,
"Track user preferences, open tasks with priorities, and conversation history.",
)
print(schema.data_schema)

Object names use CamelCase (UserPreferences, OpenTask). The generation endpoint handles naming conventions automatically.

import yaml
from xmemory import SchemaType
inst = client.admin.create_instance(
cluster_id=cluster_id,
name="my-memory",
schema_text=yaml.dump(schema.data_schema),
schema_type=SchemaType.YML,
description="Optional description",
)
# inst is a bound InstanceAPI — use inst.write(), inst.read(), etc.

For a purely additive change (new objects/fields/relations), update directly:

new_schema = client.admin.generate_schema(
cluster_id,
"Add an assignee field to tasks.",
current_yml_schema=current_schema,
)
client.admin.update_instance_schema(
instance_id, yaml.dump(new_schema.data_schema), SchemaType.YML
)

Existing data is preserved — only the new shape is applied.

Evolving a schema (rename / remove / type change)

Section titled “Evolving a schema (rename / remove / type change)”

Non-additive changes preserve data via a structured migration plan. Use enhance_schema to get the plan, dry_run_migration to preview the DDL, then update_instance_schema to apply it. confirm_destructive=True is required for ops that drop data.

current = client.admin.get_instance_schema(instance_id).data_schema
# 1. Enhance — new schema + an executor-ready migration plan.
enhanced = client.admin.enhance_schema(
cluster_id,
"Rename Person.mail to Person.email.",
yaml.dump(current),
)
print(enhanced.summary)
for op in enhanced.migration_plan.ops:
print(op)
new_yaml = yaml.dump(enhanced.data_schema)
# 2. Dry-run — preview the DDL, apply nothing.
preview = client.admin.dry_run_migration(
instance_id, new_yaml, SchemaType.YML, migration_plan=enhanced.migration_plan,
)
print(preview.statements)
# 3. Update — apply. A rename is non-destructive, so confirm_destructive stays False.
info = client.admin.update_instance_schema(
instance_id, new_yaml, SchemaType.YML,
migration_plan=enhanced.migration_plan, confirm_destructive=False,
)
print(info.migration_id, info.prior_version, "->", info.new_version)

Suggestion engine (review → decide → apply)

Section titled “Suggestion engine (review → decide → apply)”

xmemory watches read traffic and, on demand, surfaces a single rolling proposal of schema improvements. The flow is three calls — never wrap them; the checkpoints are the point.

from xmemory import DecisionInput
inst = client.instance(instance_id)
# 1. Review — proposal + optimistic-concurrency token.
review = inst.review_suggestions()
if review.status == "evolution_in_progress":
print(f"Migration in flight; retry in {review.retry_after_seconds}s")
else:
proposal = review.proposal
for item in proposal.items:
print(item.item_fingerprint, item.rationale, item.op)
# 2. Decide — accept / reject / defer per item, in bulk.
decided = inst.decide_suggestions(
proposal.proposal_version,
[DecisionInput(item_fingerprint=i.item_fingerprint, decision="accept")
for i in proposal.items],
)
# 3. Apply — commit accepted decisions as one migration.
applied = inst.apply_pending_decisions(decided.next_proposal_version)
print(applied.status, applied.summary)
page = client.admin.list_migrations(instance_id, limit=20)
for r in page.items:
print(r.id, r.source, r.prior_version, "->", r.new_version)
detail = client.admin.get_migration(instance_id, page.items[0].id, include_yaml=True)
print(detail.yaml_before, detail.yaml_after)

Migration ops are exported as typed models (MigrationPlan, AddField, RenameField, RemoveObject, …). ProposalItem.op and MigrationRecord.ops stay as raw dicts for forward compatibility — use parse_migration_op(...) to validate them.

instances = client.admin.list_instances()
info = client.admin.get_instance(instance_id)
client.admin.delete_instance(instance_id)

All errors raise XmemoryAPIError. The exception carries an optional .status (HTTP status code), .code (structured error code, when the server returned one), and .details.

from xmemory import XmemoryAPIError, XmemoryHealthCheckError
# Check connectivity
try:
client.check_health()
except XmemoryHealthCheckError as e:
print(f"API unreachable: {e}")
# Handle operation errors
try:
inst.write("...")
except XmemoryAPIError as e:
print(f"Error (HTTP {e.status}): {e}")

XmemoryHealthCheckError is a subclass of XmemoryAPIError, so catching XmemoryAPIError covers both.

The schema-evolution methods set .code to a structured error type you can match on — e.g. stale_proposal_version, destructive_confirmation_required, non_additive_change_requires_plan:

try:
inst.apply_pending_decisions(token)
except XmemoryAPIError as e:
if e.code == "stale_proposal_version":
review = inst.review_suggestions() # re-review and retry

MethodReturnsDescription
XmemoryClient(url=None, *, timeout=60, api_key=None, http_client=None)Create a sync client. url is the only positional arg; the rest are keyword-only. api_key and url fall back to XMEM_API_KEY / XMEM_API_URL.
AsyncXmemoryClient(url=None, *, timeout=60, api_key=None, http_client=None)Create an async client (same signature)
client.instance(instance_id)InstanceAPIGet a handle for data operations on an instance
client.check_health()Raises XmemoryHealthCheckError if the API is unreachable
client.close()Close the underlying HTTP client
MethodReturnsDescription
list_clusters(ids?, timeout?)list[ClusterInfo]List clusters
get_cluster(cluster_id, timeout?)ClusterInfoGet a cluster by ID
generate_schema(cluster_id, description, *, current_yml_schema?, timeout?)GenerateSchemaResultGenerate a schema from a plain-language description
create_instance(cluster_id, name, schema_text, schema_type, *, description?, schema_description?, timeout?)InstanceAPICreate a new instance; returns a bound handle
list_instances(ids?, timeout?)list[InstanceInfo]List instances
get_instance(instance_id, timeout?)InstanceInfoGet an instance by ID
delete_instance(instance_id, timeout?)list[str]Delete an instance
get_instance_schema(instance_id, timeout?)InstanceSchemaInfoGet an instance’s schema
update_instance_schema(instance_id, schema_text, schema_type, *, migration_plan?, confirm_destructive?, timeout?)InstanceInfoUpdate a schema; pass migration_plan for non-additive changes
update_instance_metadata(instance_id, name, description?, timeout?)InstanceInfoUpdate instance name and description
enhance_schema(cluster_id, schema_description, current_yml_schema, *, timeout?)EnhanceSchemaResultEvolve a schema → new schema + migration plan
dry_run_migration(instance_id, schema_text, schema_type, *, migration_plan?, confirm_destructive?, timeout?)DryRunResultPreview a migration’s DDL
list_migrations(instance_id, *, limit?, before_id?, include_yaml?, timeout?)ListMigrationsResultList applied migrations (newest first)
get_migration(instance_id, migration_id, *, include_yaml?, timeout?)MigrationRecordGet a single migration record
MethodReturnsDescription
write(text, *, extraction_logic?, diff_engine?, timeout?)WriteResultExtract and persist objects from text
write_async(text, *, extraction_logic?, diff_engine?, timeout?)AsyncWriteResultEnqueue a write; returns write_id immediately
write_status(write_id, timeout?)WriteStatusResultPoll the status of an async write
read(query, *, read_mode?, read_id?, timeout?)ReadResultQuery the instance in natural language
extract(text, *, extraction_logic?, timeout?)ExtractResultExtract objects without writing them
get_schema(timeout?)InstanceSchemaInfoGet this instance’s schema
review_suggestions(*, session_id?, timeout?)ReviewSuggestionsResultGet the rolling schema-improvement proposal
decide_suggestions(proposal_version, decisions, *, session_id?, timeout?)DecideSuggestionsResultRecord accept/reject/defer decisions in bulk
apply_pending_decisions(proposal_version, *, session_id?, timeout?)ApplyPendingDecisionsResultApply accepted decisions as one migration
describe(timeout?)DescribeResultGet agent-facing tool descriptions (cached 5 min)
clear_describe_cache()NoneForce next describe() to fetch fresh data
idstrThe instance ID (property)
EnumValues
SchemaTypeYML, JSON
ExtractionLogicFAST, REGULAR, DEEP
ReadModeSINGLE_ANSWER, RAW_TABLES, XRESPONSE
WriteQueueStatusQUEUED, PROCESSING, COMPLETED, FAILED, NOT_FOUND
ExceptionParentAttributes
XmemoryAPIErrorException.status, .code, .details (each None when absent)
XmemoryHealthCheckErrorXmemoryAPIError.status (HTTP status code or None)