Skip to content

Feat/pre aligned commentary serializer with pecha display #299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 51 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a8d0805
add segmentation pecha display layer to commentary pecha
tenzin3 May 1, 2025
f454359
Replace AnnotationStore with load_layer for loading layer files
tenzin3 May 1, 2025
94ac7e7
Rename transfer_layer to alignment_layer for better clarity
tenzin3 May 1, 2025
89dbd97
Refactor translation transfer to use get_anns and improve span handling
tenzin3 May 1, 2025
4846d89
Remove unnecessary blank lines in translation_transfer.py
tenzin3 May 1, 2025
d7a92e4
Add translation_display_id param to get_translation_pechas_mapping me…
tenzin3 May 1, 2025
7e4a579
Rename mapping var to map and fix display layer path in translation m…
tenzin3 May 1, 2025
ae2a07d
refactor: simplify mapping functions and improve variable naming clarity
tenzin3 May 1, 2025
584ec55
refactor: rename display_layer_path to segmentation_ann_path and sort…
tenzin3 May 1, 2025
f71f009
Refactor translation layer loading and variable naming in translation…
tenzin3 May 1, 2025
1af94a6
Rename variables for better clarity and consistency in translation_tr…
tenzin3 May 1, 2025
cf64a75
Improve variable names and add type hints in translation transfer fun…
tenzin3 May 1, 2025
8277fef
Improve serialization docstrings for translation alignment and segmen…
tenzin3 May 1, 2025
15be984
Rename translation serialization methods for clarity and remove test …
tenzin3 May 1, 2025
7baf131
Rename display layer to segmentation layer and refactor mapping function
tenzin3 May 1, 2025
6b6c7ff
Refactor annotation mapping logic with improved type hints and variab…
tenzin3 May 1, 2025
9d035ad
Refactor extract_root_anns to ann_to_dict for simpler annotation proc…
tenzin3 May 1, 2025
62b8c4f
Rename ann_to_dict method to index_annotations_by_root for better cla…
tenzin3 May 1, 2025
b18bf91
Rename display_layer variables to segmentation for clarity and consis…
tenzin3 May 1, 2025
ddcd874
Refactor commentary transfer with improved type hints and code organi…
tenzin3 May 1, 2025
7d296cd
Improve docstring formatting for better readability
tenzin3 May 2, 2025
9c438a3
Rename serialized_content to res and improve code formatting in comme…
tenzin3 May 2, 2025
51e6ae2
Reorder root index parsing to avoid unnecessary processing for empty …
tenzin3 May 2, 2025
a9b90a5
Rename translation_alignment_layer var to layer and add test executio…
tenzin3 May 2, 2025
f62ca41
Refactor layer loading in translation_transfer to improve readability
tenzin3 May 2, 2025
061f924
Remove test code and simplify root segmentation mapping logic
tenzin3 May 2, 2025
55ae55a
Refactor translation transfer with code reuse and add test execution …
tenzin3 May 2, 2025
a71702d
Rename root_idx to aligned_idx for better clarity in translation tran…
tenzin3 May 2, 2025
35c5c46
Rename get_serialized_from_mapping to mapping_to_text_list for better…
tenzin3 May 2, 2025
06aaad3
Add return type annotation for mapping_to_text_list and remove test c…
tenzin3 May 2, 2025
9138879
Rename get_chapter_num_from_segment_num to get_chapter_for_segment fo…
tenzin3 May 2, 2025
00f3505
Rename process_segment_num_for_chapter to adjust_segment_num_for_chapter
tenzin3 May 2, 2025
2ea9288
Refactor commentary transfer with helper functions for root index val…
tenzin3 May 2, 2025
43a9e72
Refactor get_first_valid_root_idx into static method for better reusa…
tenzin3 May 2, 2025
e7255ca
Refactor validation logic into reusable is_valid_ann method
tenzin3 May 2, 2025
182d9b0
Refactor commentary processing into smaller methods for better readab…
tenzin3 May 2, 2025
845f750
Add method to handle multiple root indices and get commentary pechas …
tenzin3 May 2, 2025
0a6c0ef
Fix root index mapping handling and update test expected values
tenzin3 May 2, 2025
1b3f10a
Refactor annotation mapping with helper functions and improved error …
tenzin3 May 2, 2025
7e3f8c9
Rename display_id to segmentation_id for better clarity in translatio…
tenzin3 May 2, 2025
4fe0c66
Add method to serialize commentary segments with display index mapping
tenzin3 May 2, 2025
946c11f
Rename translation_display_id to translation_segmentation_id for cons…
tenzin3 May 5, 2025
8002206
Add support for commentary segmentation in prealigned commentary seri…
tenzin3 May 5, 2025
091f143
Add condition to handle translation segmentation id in prealigned roo…
tenzin3 May 5, 2025
619d4dd
Make alignment_id optional in PechaAlignment and fix alignment serial…
tenzin3 May 5, 2025
a56c203
Refactor segmentation check into dedicated function for better readab…
tenzin3 May 5, 2025
e2b4a68
Fix root alignment ID retrieval using annotation path in commentary s…
tenzin3 May 5, 2025
a472675
Fix annotation path comparison in segmentation check
tenzin3 May 5, 2025
43133d6
Move is_segmentation_annotation function outside of _serialize_preali…
tenzin3 May 5, 2025
c9db827
Add support for prealigned commentary with segmentation annotation
tenzin3 May 5, 2025
7d49000
Fix annotation ID generation in Pecha class
tenzin3 May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 184 additions & 109 deletions src/openpecha/alignment/commentary_transfer.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,61 @@
from typing import Dict, List
from pathlib import Path
from typing import Any, Dict, List

from stam import AnnotationStore

from openpecha.config import get_logger
from openpecha.pecha import Pecha, get_anns
from openpecha.utils import (
get_chapter_num_from_segment_num,
process_segment_num_for_chapter,
)
from openpecha.pecha import Pecha, get_anns, load_layer
from openpecha.utils import adjust_segment_num_for_chapter, get_chapter_for_segment

logger = get_logger(__name__)


def is_empty(text: str) -> bool:
"""
Return True if text is empty or contains only newlines.
"""
return not text.strip().replace("\n", "")


def parse_root_mapping(mapping: str) -> List[int]:
"""
Parse root_idx_mapping string like '1,2-4' into a sorted list of ints.
"""
res = []
for part in mapping.strip().split(","):
part = part.strip()
if "-" in part:
start, end = part.split("-")
res.extend(list(range(int(start), int(end) + 1)))
else:
res.append(int(part))
res.sort()
return res


class CommentaryAlignmentTransfer:
def get_display_layer_path(self, pecha: Pecha) -> Pecha:
@staticmethod
def get_first_valid_root_idx(ann) -> int | None:
indices = parse_root_mapping(ann["root_idx_mapping"])
return indices[0] if indices else None

@staticmethod
def is_valid_ann(anns: Dict[int, Dict[str, Any]], idx: int) -> bool:
return idx in anns and not is_empty(anns[idx]["text"])

def get_segmentation_ann_path(self, pecha: Pecha) -> Path:
"""
Return the path to the first segmentation layer JSON file in the pecha.
"""
return next(pecha.layer_path.rglob("segmentation-*.json"))

def extract_root_anns(self, layer: AnnotationStore) -> Dict[int, Dict]:
def index_annotations_by_root(
self, anns: List[Dict[str, Any]]
) -> Dict[int, Dict[str, Any]]:
"""
Extract annotations from a STAM layer into a dictionary keyed by root index mapping.
Return a dict mapping root_idx_mapping to the annotation dict.
"""
anns = {}
for ann in layer.annotations():
start, end = ann.offset().begin().value(), ann.offset().end().value()
ann_metadata = {data.key().id(): str(data.value()) for data in ann}
root_idx = int(ann_metadata["root_idx_mapping"])
anns[root_idx] = {
"Span": {"start": start, "end": end},
"text": str(ann),
"root_idx_mapping": root_idx,
}
return anns
return {int(ann["root_idx_mapping"]): ann for ann in anns}

def map_layer_to_layer(
self, src_layer: AnnotationStore, tgt_layer: AnnotationStore
Expand All @@ -39,45 +64,68 @@ def map_layer_to_layer(
Map annotations from src_layer to tgt_layer based on span overlap or containment.
Returns a mapping from source indices to lists of target indices.
"""
mapping: Dict = {}

src_anns = self.extract_root_anns(src_layer)
tgt_anns = self.extract_root_anns(tgt_layer)

for src_idx, src_span in src_anns.items():
src_start, src_end = src_span["Span"]["start"], src_span["Span"]["end"]
def extract_idx(ann: dict) -> int:
"""Helper to extract a single int index from root_idx_mapping."""
if "-" in ann["root_idx_mapping"] or "," in ann["root_idx_mapping"]:
idx = self.get_first_valid_root_idx(ann)
if idx is None:
raise ValueError(
f"Invalid root_idx_mapping: {ann['root_idx_mapping']}"
)
return idx
return int(ann["root_idx_mapping"])

def is_match(src_start, src_end, tgt_start, tgt_end):
"""Helper to check if spans overlap or are contained (not edge overlap)."""
is_overlap = (
src_start <= tgt_start < src_end or src_start < tgt_end <= src_end
)
is_contained = tgt_start < src_start and tgt_end > src_end
is_edge_overlap = tgt_start == src_end or tgt_end == src_start
return (is_overlap or is_contained) and not is_edge_overlap

mapping: Dict[int, List[int]] = {}
src_anns = get_anns(src_layer, include_span=True)
tgt_anns = get_anns(tgt_layer, include_span=True)
for src_ann in src_anns:
src_start, src_end = src_ann["Span"]["start"], src_ann["Span"]["end"]
try:
src_idx = extract_idx(src_ann)
except ValueError:
continue
mapping[src_idx] = []

for tgt_idx, tgt_span in tgt_anns.items():
tgt_start, tgt_end = tgt_span["Span"]["start"], tgt_span["Span"]["end"]

# Check for mapping conditions
is_overlap = (
src_start <= tgt_start < src_end or src_start < tgt_end <= src_end
)
is_contained = tgt_start < src_start and tgt_end > src_end
is_edge_overlap = tgt_start == src_end or tgt_end == src_start
if is_overlap or is_contained and not is_edge_overlap:
for tgt_ann in tgt_anns:
tgt_start, tgt_end = tgt_ann["Span"]["start"], tgt_ann["Span"]["end"]
try:
tgt_idx = extract_idx(tgt_ann)
except ValueError:
continue
if is_match(src_start, src_end, tgt_start, tgt_end):
mapping[src_idx].append(tgt_idx)

# Sort the mapping by source indices
return dict(sorted(mapping.items()))

def get_root_pechas_mapping(
self, root_pecha: Pecha, root_alignment_id: str
) -> Dict[int, List]:
self, pecha: Pecha, alignment_id: str
) -> Dict[int, List[int]]:
"""
Get segmentation mapping from root_pecha -> root_display_pecha
Get mapping from pecha's alignment layer to segmentation layer.
"""
display_layer_path = self.get_display_layer_path(root_pecha)

display_layer = AnnotationStore(file=str(display_layer_path))
transfer_layer = AnnotationStore(
file=str(root_pecha.layer_path / root_alignment_id)
)
segmentation_ann_path = self.get_segmentation_ann_path(pecha)
segmentation_layer = load_layer(segmentation_ann_path)
alignment_layer = load_layer(pecha.layer_path / alignment_id)
return self.map_layer_to_layer(alignment_layer, segmentation_layer)

map = self.map_layer_to_layer(transfer_layer, display_layer)
return map
def get_commentary_pechas_mapping(
self, pecha: Pecha, alignment_id: str, segmentation_id: str
) -> Dict[int, List[int]]:
"""
Get mapping from pecha's segmentation layer to alignment layer.
"""
segmentation_ann_path = pecha.layer_path / segmentation_id
segmentation_layer = load_layer(segmentation_ann_path)
alignment_layer = load_layer(pecha.layer_path / alignment_id)
return self.map_layer_to_layer(segmentation_layer, alignment_layer)

def get_serialized_commentary(
self,
Expand All @@ -86,81 +134,108 @@ def get_serialized_commentary(
commentary_pecha: Pecha,
commentary_alignment_id: str,
) -> List[str]:
def is_empty(text):
"""Check if text is empty or contains only newlines."""
return not text.strip().replace("\n", "")

"""
Serialize commentary annotations with root/segmentation mapping and formatting.
"""
root_map = self.get_root_pechas_mapping(root_pecha, root_alignment_id)

root_display_layer_path = self.get_display_layer_path(root_pecha)
root_display_anns = self.extract_root_anns(
AnnotationStore(file=str(root_display_layer_path))
root_segmentation_path = self.get_segmentation_ann_path(root_pecha)
root_segmentation_anns = self.index_annotations_by_root(
get_anns(load_layer(root_segmentation_path))
)

root_anns = self.extract_root_anns(
AnnotationStore(file=str(root_pecha.layer_path / root_alignment_id))
root_anns = self.index_annotations_by_root(
get_anns(load_layer(root_pecha.layer_path / root_alignment_id))
)

commentary_anns = get_anns(
AnnotationStore(
file=str(commentary_pecha.layer_path / commentary_alignment_id)
)
load_layer(commentary_pecha.layer_path / commentary_alignment_id)
)
serialized_content = []

res: List[str] = []
for ann in commentary_anns:
root_indices = parse_root_mapping(ann["root_idx_mapping"])
root_idx = root_indices[0]
commentary_text = ann["text"]
result = self.process_commentary_ann(
ann, root_anns, root_map, root_segmentation_anns
)
if result is not None:
res.append(result)
return res

# Skip if commentary is empty
is_commentary_empty = is_empty(commentary_text)
if is_commentary_empty:
continue
def get_serialized_commentary_segmentation(
self,
root_pecha: Pecha,
root_alignment_id: str,
commentary_pecha: Pecha,
commentary_alignment_id: str,
commentary_segmentation_id: str,
) -> List[str]:
root_map = self.get_root_pechas_mapping(root_pecha, root_alignment_id)
commentary_map = self.get_commentary_pechas_mapping(
commentary_pecha, commentary_alignment_id, commentary_segmentation_id
)

# Dont include mapping if root is empty
idx_not_in_root = root_idx not in root_anns
if idx_not_in_root:
serialized_content.append(commentary_text)
continue
root_segmentation_path = self.get_segmentation_ann_path(root_pecha)
root_segmentation_anns = self.index_annotations_by_root(
get_anns(load_layer(root_segmentation_path))
)
root_anns = self.index_annotations_by_root(
get_anns(load_layer(root_pecha.layer_path / root_alignment_id))
)
commentary_segmentation_anns = get_anns(
load_layer(commentary_pecha.layer_path / commentary_segmentation_id)
)

is_root_empty = is_empty(root_anns[root_idx]["text"])
if is_root_empty:
serialized_content.append(commentary_text)
res: List[str] = []
for ann in commentary_segmentation_anns:
text = ann["text"]
if is_empty(text):
continue

# Dont include mapping if root_display is empty
root_display_idx = root_map[root_idx][0]
idx_not_in_root_display = root_display_idx not in root_display_anns
if idx_not_in_root_display:
serialized_content.append(commentary_text)
continue
aligned_idx = commentary_map[int(ann["root_idx_mapping"])][0]

is_root_display_empty = is_empty(
root_display_anns[root_display_idx]["text"]
)
if is_root_display_empty:
serialized_content.append(commentary_text)
continue
if not self.is_valid_ann(root_anns, aligned_idx):
res.append(text)

root_display_idx = root_map[aligned_idx][0]
if not self.is_valid_ann(root_segmentation_anns, root_display_idx):
res.append(text)

chapter_num = get_chapter_num_from_segment_num(root_display_idx)
processed_root_display_idx = process_segment_num_for_chapter(
chapter_num = get_chapter_for_segment(root_display_idx)
processed_root_display_idx = adjust_segment_num_for_chapter(
root_display_idx
)
serialized_content.append(
f"<{chapter_num}><{processed_root_display_idx}>{commentary_text}"
res.append(
self.format_serialized_commentary(
chapter_num, processed_root_display_idx, text
)
)
return serialized_content

return res

def parse_root_mapping(mapping: str) -> List[int]:
res = []
for map in mapping.strip().split(","):
map = map.strip()
if "-" in map:
start, end = map.split("-")
res.extend(list(range(int(start), int(end) + 1)))
else:
res.append(int(map))
@staticmethod
def format_serialized_commentary(chapter_num: int, seg_idx: int, text: str) -> str:
"""Format the serialized commentary string."""
return f"<{chapter_num}><{seg_idx}>{text}"

res.sort()
return res
def process_commentary_ann(
self,
ann: dict,
root_anns: dict,
root_map: dict,
root_segmentation_anns: dict,
) -> str | None:
"""Process a single commentary annotation and return the serialized string, or None if not valid."""
commentary_text = ann["text"]
if is_empty(commentary_text):
return None

root_idx = self.get_first_valid_root_idx(ann)
if root_idx is None or not self.is_valid_ann(root_anns, root_idx):
return commentary_text

root_display_idx = root_map[root_idx][0]
if not self.is_valid_ann(root_segmentation_anns, root_display_idx):
return commentary_text

chapter_num = get_chapter_for_segment(root_display_idx)
processed_root_display_idx = adjust_segment_num_for_chapter(root_display_idx)
return self.format_serialized_commentary(
chapter_num, processed_root_display_idx, commentary_text
)
Loading