Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/crud/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,19 @@ async def query_documents(
return docs


def _normalize_content(content: str) -> str:
"""Normalize document content for exact-match deduplication.

Content is compared after trimming surrounding whitespace and lowercasing

The SQL filter in ``create_documents`` must stay in sync with this:
``lower(regexp_replace(content, '^\\s+|\\s+$', '', 'g'))``. Postgres'
``trim()`` only strips spaces, so a regex is used to match Python's
``str.strip()`` across all whitespace.
"""
return content.strip().lower()
Comment on lines +431 to +441

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

PostgreSQL regexp_replace \s whitespace class match non-breaking space unicode

💡 Result:

In PostgreSQL, the \s shorthand in regular expressions represents the [[:space:]] POSIX character class [1][2]. Whether this class includes the non-breaking space (U+00A0) depends on your database's locale (LC_CTYPE) and encoding [3]. In many standard environments, [[:space:]] does not include the non-breaking space, which means \s will fail to match it [4][3]. To reliably match or replace non-breaking spaces along with standard whitespace, you must explicitly include the non-breaking space character in your regular expression [4][5]. You can use the following approach to replace both standard whitespace and non-breaking spaces globally: SELECT regexp_replace(your_column, '[\s\u00A0]+', ' ', 'g'); In this expression: - \s matches standard whitespace characters defined by your system's [[:space:]] class [1][2]. - \u00A0 explicitly adds the non-breaking space to the character class [4]. - The g flag ensures all occurrences are replaced, rather than just the first one [4]. If you need to target a wider range of Unicode whitespace or separator characters, you can expand the character class further [6]: SELECT regexp_replace(your_column, '[\s\u00A0\u1680\u2000-\u200A\u202F\u205F\u3000]+', ' ', 'g'); Because the behavior of \s is locale-dependent, testing against your specific data and locale settings is recommended [3]. If you are only dealing with simple replacements, the non-regex replace() function is often more performant and avoids locale-related ambiguities [7].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Map relevant files/symbols first
git ls-files src/crud/document.py
ast-grep outline src/crud/document.py --view expanded || true

# Show the relevant section with line numbers
sed -n '380,520p' src/crud/document.py

# Find the SQL normalization expression and its callers
rg -n "regexp_replace\\(|_normalize_content|create_documents|lower\\(" src/crud/document.py src -g '!**/__pycache__/**'

Repository: plastic-labs/honcho

Length of output: 10062


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect exact behavior of PostgreSQL regex whitespace semantics from local docs if present
python3 - <<'PY'
from pathlib import Path
p = Path('src/crud/document.py')
text = p.read_text()
for needle in ["regexp_replace", "_normalize_content", "strip()", "lower()"]:
    print(f"FOUND {needle}:", needle in text)
PY

Repository: plastic-labs/honcho

Length of output: 255


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Read the specific region only if it exists
if [ -f src/crud/document.py ]; then
  nl -ba src/crud/document.py | sed -n '410,460p'
fi

Repository: plastic-labs/honcho

Length of output: 197


🌐 Web query:

PostgreSQL lower() locale-dependent Unicode casing Python str.lower difference

💡 Result:

The primary difference between PostgreSQL's lower function and Python's str.lower method lies in their reliance on locale settings and their adherence to Unicode standards [1][2][3][4]. PostgreSQL lower PostgreSQL's lower function is locale-dependent [1][2]. Its behavior is governed by the database's collation (specifically the LC_CTYPE setting of the collation) [5][6][7]. - When using libc-based collations, lower relies on the operating system's locale definitions, which can vary significantly across environments [6][7]. - When using ICU-based collations, lower leverages ICU for Unicode-compliant case mapping, which provides more consistent and robust support for various languages, including those with special casing rules like Turkish ('I' to 'ı') [5][6][8][7]. - Because it is tied to the collation, the same query might produce different results if the database is initialized with a different locale or if the expression uses an explicit COLLATE clause [5][6]. Python str.lower Python's str.lower method implements a fixed, Unicode-standard-based conversion that is not locale-aware [3][4]. - It performs a "simple" case mapping according to the Unicode standard, which does not change regardless of the system's locale settings [3][4]. - It does not account for language-specific requirements such as the Turkish "dotted/dotless I" distinction (where 'I' should map to 'ı' rather than 'i') [3][4]. - For case-insensitive comparisons, Python's str.casefold is generally preferred over str.lower because it handles more complex Unicode mappings (e.g., German 'ß' to 'ss') to facilitate caseless matching [3]. Summary of Differences 1. Locale Sensitivity: PostgreSQL lower is sensitive to the configured collation (locale) [1][5], whereas Python str.lower is locale-insensitive and always follows the same Unicode mapping [3][4]. 2. Language Support: PostgreSQL can support language-specific casing rules (e.g., Turkish) if the appropriate ICU collation is used [5][8]. Python's built-in str.lower does not support these unless external libraries like PyICU are used [3]. 3. Consistency: Python provides predictable, identical results across all environments [4]. PostgreSQL behavior for lower depends on the server's environment or collation configuration [1][6].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the surrounding create_documents logic and any dedup/uniqueness checks
nl -ba src/crud/document.py | sed -n '300,520p'

Repository: plastic-labs/honcho

Length of output: 197


Unicode normalization can drift between Python and PostgreSQL here. str.strip()/str.lower() are Unicode-aware, while regexp_replace(..., '\\s', ...) and PostgreSQL lower() are locale-dependent, so the prefetch can miss rows for inputs with non-ASCII whitespace or locale-specific casing and insert a duplicate. Match candidates in Python, or document the supported normalization boundary.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/crud/document.py` around lines 431 - 441, The normalization used by
_normalize_content and the matching SQL in create_documents can diverge for
Unicode whitespace and locale-specific casing, so update the deduplication flow
to use the same normalization boundary on both sides or explicitly
constrain/document the supported cases. In particular, review _normalize_content
and the SQL filter it must stay in sync with, and either move candidate matching
to Python using the same str.strip()/lower() behavior or adjust the PostgreSQL
prefetch logic so it cannot miss equivalent rows and create duplicates.



async def create_documents(
db: AsyncSession,
documents: list[schemas.DocumentCreate],
Expand All @@ -440,12 +453,17 @@ async def create_documents(
"""
Create multiple documents with optional duplicate detection.

The ``deduplicate`` flag additionally enables semantic (cosine-similarity)
dedup via ``is_rejected_duplicate`` for documents that survive the exact
deduplication check.

Args:
db: Database session
documents: List of document creation schemas
workspace_name: Name of the workspace
observer: Name of the observing peer
observed: Name of the observed peer
deduplicate: Enable semantic duplicate detection

Returns:
List of DocumentCreate schemas that were actually inserted (excludes
Expand All @@ -456,8 +474,54 @@ async def create_documents(
# Store (document_model, embedding) pairs - IDs aren't available until after commit
docs_with_embeddings: list[tuple[models.Document, list[float]]] = []

# exact-content dedup (independent of `deduplicate`): pre-fetch
# existing live documents whose normalized content matches anything in this
# batch, scoped to (workspace, observer, observed). The SQL normalization must
# mirror _normalize_content.
batch_normalized: set[str] = {_normalize_content(d.content) for d in documents}
existing_by_normalized: dict[str, models.Document] = {}
if batch_normalized:
normalized_content_sql = func.lower(
func.regexp_replace(models.Document.content, r"^\s+|\s+$", "", "g")
)
existing_result = await db.execute(
select(models.Document).where(
models.Document.workspace_name == workspace_name,
models.Document.observer == observer,
models.Document.observed == observed,
models.Document.deleted_at.is_(None),
normalized_content_sql.in_(batch_normalized),
)
)
for existing_doc in existing_result.scalars():
# If multiple historical rows share normalized content, reinforcing
# one is sufficient; keep the first.
existing_by_normalized.setdefault(
_normalize_content(existing_doc.content), existing_doc
)

# Tracks normalized content already accepted from this batch so exact
# duplicates within a single inference call collapse to one document.
seen_in_batch: set[str] = set()

for doc in documents:
try:
normalized_content = _normalize_content(doc.content)

# Exact-match dedup, always on:
# 1) collapse exact duplicates within this batch (drop silently).
if normalized_content in seen_in_batch:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to check for dups earlier? pre-embedding? I know it won't impact the embedding quality so it would just be an optimization change.

continue
seen_in_batch.add(normalized_content)

# 2) drop exact duplicates of an existing live document, recording
# the re-derivation as reinforcement on the existing row.
existing_match = existing_by_normalized.get(normalized_content)
if existing_match is not None:
existing_match.times_derived = models.Document.times_derived + 1
await db.flush()
continue

# for each document, if deduplicate is True, perform a process
# that checks against existing documents and either rejects this document
# as a duplicate OR deletes an existing document that is a duplicate.
Expand Down
230 changes: 230 additions & 0 deletions tests/crud/test_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,236 @@ async def test_duplicate_replacement_carries_count_forward(
# Original is soft-deleted; replacement isn't inserted until create_documents runs.
assert len(live) == 0

@pytest.mark.asyncio
async def test_exact_dedup_within_batch_drops_repeat(
self,
db_session: AsyncSession,
sample_data: tuple[models.Workspace, models.Peer],
):
"""Exact (case/whitespace-insensitive) duplicates within a single batch
collapse to one document, even with semantic dedup disabled."""
test_workspace, test_peer = sample_data
test_peer2, test_session, _ = await self._setup_test_data(
db_session, test_workspace, test_peer
)

# Three "exact" matches that differ only by case/surrounding whitespace.
doc_schemas = [
schemas.DocumentCreate(
content="User likes coffee",
embedding=[0.1] * 1536,
session_name=test_session.name,
metadata=schemas.DocumentMetadata(
message_ids=[1],
message_created_at="2026-01-01T00:00:00Z",
),
),
schemas.DocumentCreate(
content="user likes coffee",
embedding=[0.2] * 1536,
session_name=test_session.name,
metadata=schemas.DocumentMetadata(
message_ids=[2],
message_created_at="2026-01-01T00:01:00Z",
),
),
schemas.DocumentCreate(
content=" User likes coffee\n",
embedding=[0.3] * 1536,
session_name=test_session.name,
metadata=schemas.DocumentMetadata(
message_ids=[3],
message_created_at="2026-01-01T00:02:00Z",
),
),
]

accepted = await crud.create_documents(
db_session,
documents=doc_schemas,
workspace_name=test_workspace.name,
observer=test_peer.name,
observed=test_peer2.name,
deduplicate=False,
)

assert len(accepted) == 1
live = (
(
await db_session.execute(
select(models.Document).where(
models.Document.workspace_name == test_workspace.name,
models.Document.observer == test_peer.name,
models.Document.observed == test_peer2.name,
models.Document.deleted_at.is_(None),
)
)
)
.scalars()
.all()
)
assert len(live) == 1
# Within-batch repeats are dropped silently, no reinforcement.
assert live[0].times_derived == 1

@pytest.mark.asyncio
async def test_exact_dedup_against_existing_reinforces(
self,
db_session: AsyncSession,
sample_data: tuple[models.Workspace, models.Peer],
):
"""An exact match of an existing live document is rejected and reinforces
the existing row, even with semantic dedup disabled."""
test_workspace, test_peer = sample_data
test_peer2, test_session, _ = await self._setup_test_data(
db_session, test_workspace, test_peer
)

await crud.create_documents(
db_session,
[
schemas.DocumentCreate(
content="User likes coffee",
embedding=[0.1] * 1536,
session_name=test_session.name,
times_derived=1,
metadata=schemas.DocumentMetadata(
message_ids=[1],
message_created_at="2026-01-01T00:00:00Z",
),
)
],
workspace_name=test_workspace.name,
observer=test_peer.name,
observed=test_peer2.name,
deduplicate=False,
)

# Case/whitespace variant of the existing content -> exact match.
accepted = await crud.create_documents(
db_session,
[
schemas.DocumentCreate(
content="user likes coffee ",
embedding=[0.9] * 1536,
session_name=test_session.name,
times_derived=1,
metadata=schemas.DocumentMetadata(
message_ids=[2],
message_created_at="2026-01-02T00:00:00Z",
),
)
],
workspace_name=test_workspace.name,
observer=test_peer.name,
observed=test_peer2.name,
deduplicate=False,
)

assert len(accepted) == 0
surviving = (
(
await db_session.execute(
select(models.Document).where(
models.Document.workspace_name == test_workspace.name,
models.Document.observer == test_peer.name,
models.Document.observed == test_peer2.name,
models.Document.deleted_at.is_(None),
)
)
)
.scalars()
.all()
)
assert len(surviving) == 1
assert surviving[0].content == "User likes coffee"
assert surviving[0].times_derived == 2

@pytest.mark.asyncio
async def test_exact_dedup_flushes_before_semantic_replacement(
self,
db_session: AsyncSession,
sample_data: tuple[models.Workspace, models.Peer],
):
"""An exact-match reinforcement in a batch must be visible to a later
semantic replacement of the same existing row when autoflush is off."""
test_workspace, test_peer = sample_data
test_peer2, test_session, _ = await self._setup_test_data(
db_session, test_workspace, test_peer
)

await crud.create_documents(
db_session,
[
schemas.DocumentCreate(
content="User likes coffee",
embedding=[0.5] * 1536,
session_name=test_session.name,
times_derived=1,
metadata=schemas.DocumentMetadata(
message_ids=[1],
message_created_at="2026-01-01T00:00:00Z",
),
)
],
workspace_name=test_workspace.name,
observer=test_peer.name,
observed=test_peer2.name,
deduplicate=False,
)

db_session.autoflush = False
accepted = await crud.create_documents(
db_session,
[
schemas.DocumentCreate(
content=" user likes coffee ",
embedding=[0.5] * 1536,
session_name=test_session.name,
times_derived=1,
metadata=schemas.DocumentMetadata(
message_ids=[2],
message_created_at="2026-01-02T00:00:00Z",
),
),
schemas.DocumentCreate(
content="User likes coffee and tea",
embedding=[0.5] * 1536,
session_name=test_session.name,
times_derived=1,
metadata=schemas.DocumentMetadata(
message_ids=[3],
message_created_at="2026-01-03T00:00:00Z",
),
),
],
workspace_name=test_workspace.name,
observer=test_peer.name,
observed=test_peer2.name,
deduplicate=True,
)

assert len(accepted) == 1
assert accepted[0].content == "User likes coffee and tea"

surviving = (
(
await db_session.execute(
select(models.Document).where(
models.Document.workspace_name == test_workspace.name,
models.Document.observer == test_peer.name,
models.Document.observed == test_peer2.name,
models.Document.deleted_at.is_(None),
)
)
)
.scalars()
.all()
)
assert len(surviving) == 1
assert surviving[0].content == "User likes coffee and tea"
assert surviving[0].times_derived == 3

@pytest.mark.asyncio
async def test_delete_document_success(
self,
Expand Down
Loading