Skip to content

Adapters

Adapters are concrete implementations of the protocol interfaces. They wrap existing chains and tools from the codebase into the pluggable workflow system.


Extractors

Claim extraction strategies implement the ClaimExtractor protocol.

SimpleExtractor

truthfulness_evaluator.strategies.extractors.SimpleExtractor

Adapter for SimpleClaimExtractionChain satisfying ClaimExtractor protocol.

Source code in src/truthfulness_evaluator/strategies/extractors/simple.py
class SimpleExtractor:
    """Adapter for SimpleClaimExtractionChain satisfying ClaimExtractor protocol."""

    def __init__(self, model: str = "gpt-4o-mini"):
        self._chain = SimpleClaimExtractionChain(model=model)

    async def extract(
        self,
        document: str,
        source_path: str,
        *,
        max_claims: int | None = None,
    ) -> list[Claim]:
        """Extract claims using simple LLM-based extraction."""
        return await self._chain.extract(
            document=document,
            source_path=source_path,
            max_claims=max_claims,
        )

extract(document, source_path, *, max_claims=None) async

Extract claims using simple LLM-based extraction.

Source code in src/truthfulness_evaluator/strategies/extractors/simple.py
async def extract(
    self,
    document: str,
    source_path: str,
    *,
    max_claims: int | None = None,
) -> list[Claim]:
    """Extract claims using simple LLM-based extraction."""
    return await self._chain.extract(
        document=document,
        source_path=source_path,
        max_claims=max_claims,
    )

TripletExtractor

truthfulness_evaluator.strategies.extractors.TripletExtractor

Adapter for TripletExtractionChain satisfying ClaimExtractor protocol.

Source code in src/truthfulness_evaluator/strategies/extractors/triplet.py
class TripletExtractor:
    """Adapter for TripletExtractionChain satisfying ClaimExtractor protocol."""

    def __init__(self, model: str = "gpt-4o-mini"):
        self._chain = TripletExtractionChain(model=model)

    async def extract(
        self,
        document: str,
        source_path: str,
        *,
        max_claims: int | None = None,
    ) -> list[Claim]:
        """Extract claims as subject-relation-object triplets."""
        return await self._chain.extract(
            document=document,
            source_path=source_path,
            max_claims=max_claims,
        )

extract(document, source_path, *, max_claims=None) async

Extract claims as subject-relation-object triplets.

Source code in src/truthfulness_evaluator/strategies/extractors/triplet.py
async def extract(
    self,
    document: str,
    source_path: str,
    *,
    max_claims: int | None = None,
) -> list[Claim]:
    """Extract claims as subject-relation-object triplets."""
    return await self._chain.extract(
        document=document,
        source_path=source_path,
        max_claims=max_claims,
    )

Gatherers

Evidence gathering strategies implement the EvidenceGatherer protocol.

WebSearchGatherer

truthfulness_evaluator.strategies.gatherers.WebSearchGatherer

Adapter wrapping WebEvidenceGatherer for the EvidenceGatherer protocol.

Source code in src/truthfulness_evaluator/strategies/gatherers/web.py
class WebSearchGatherer:
    """Adapter wrapping WebEvidenceGatherer for the EvidenceGatherer protocol."""

    def __init__(self, max_results: int = 3):
        """Initialize web search gatherer.

        Args:
            max_results: Maximum number of search results to gather
        """
        self._max_results = max_results
        self._gatherer = WebEvidenceGatherer()

    async def gather(self, claim: Claim, context: dict[str, Any]) -> list[Evidence]:
        """Gather web evidence for a claim.

        Args:
            claim: The claim to find evidence for
            context: Workflow context (unused by web search)

        Returns:
            List of Evidence objects from web sources
        """
        raw_results = await self._gatherer.gather_evidence(
            claim.text, max_results=self._max_results
        )

        evidence = []
        for item in raw_results:
            if "error" in item:
                logger.warning(f"Web search error: {item['error']}")
                continue

            evidence.append(
                Evidence(
                    source=item.get("source", "web"),
                    source_type="web",
                    content=item.get("content", "")[:1500],
                    relevance_score=item.get("relevance", 0.6),
                    supports_claim=None,
                )
            )

        return evidence

__init__(max_results=3)

Initialize web search gatherer.

Parameters:

Name Type Description Default
max_results int

Maximum number of search results to gather

3
Source code in src/truthfulness_evaluator/strategies/gatherers/web.py
def __init__(self, max_results: int = 3):
    """Initialize web search gatherer.

    Args:
        max_results: Maximum number of search results to gather
    """
    self._max_results = max_results
    self._gatherer = WebEvidenceGatherer()

gather(claim, context) async

Gather web evidence for a claim.

Parameters:

Name Type Description Default
claim Claim

The claim to find evidence for

required
context dict[str, Any]

Workflow context (unused by web search)

required

Returns:

Type Description
list[Evidence]

List of Evidence objects from web sources

Source code in src/truthfulness_evaluator/strategies/gatherers/web.py
async def gather(self, claim: Claim, context: dict[str, Any]) -> list[Evidence]:
    """Gather web evidence for a claim.

    Args:
        claim: The claim to find evidence for
        context: Workflow context (unused by web search)

    Returns:
        List of Evidence objects from web sources
    """
    raw_results = await self._gatherer.gather_evidence(
        claim.text, max_results=self._max_results
    )

    evidence = []
    for item in raw_results:
        if "error" in item:
            logger.warning(f"Web search error: {item['error']}")
            continue

        evidence.append(
            Evidence(
                source=item.get("source", "web"),
                source_type="web",
                content=item.get("content", "")[:1500],
                relevance_score=item.get("relevance", 0.6),
                supports_claim=None,
            )
        )

    return evidence

FilesystemGatherer

truthfulness_evaluator.strategies.gatherers.FilesystemGatherer

Adapter wrapping FilesystemEvidenceAgent for the EvidenceGatherer protocol.

Source code in src/truthfulness_evaluator/strategies/gatherers/filesystem.py
class FilesystemGatherer:
    """Adapter wrapping FilesystemEvidenceAgent for the EvidenceGatherer protocol."""

    def __init__(self, model: str = "gpt-4o"):
        """Initialize filesystem gatherer.

        Args:
            model: LLM model name for ReAct agent
        """
        self._model = model

    async def gather(self, claim: Claim, context: dict[str, Any]) -> list[Evidence]:
        """Gather filesystem evidence for a claim.

        Args:
            claim: The claim to find evidence for
            context: Workflow context containing root_path

        Returns:
            List of Evidence objects from filesystem sources
        """
        root_path = context.get("root_path")
        if not root_path:
            logger.warning("No root_path in context, skipping filesystem search")
            return []

        agent = FilesystemEvidenceAgent(root_path=root_path, model=self._model)
        raw_results = await agent.search(claim.text)

        evidence = []
        for item in raw_results:
            evidence.append(
                Evidence(
                    source=item.get("file_path", "unknown"),
                    source_type="filesystem",
                    content=item.get("content", "")[:1000],
                    relevance_score=item.get("relevance", 0.5),
                    supports_claim=item.get("supports"),
                )
            )

        return evidence

__init__(model='gpt-4o')

Initialize filesystem gatherer.

Parameters:

Name Type Description Default
model str

LLM model name for ReAct agent

'gpt-4o'
Source code in src/truthfulness_evaluator/strategies/gatherers/filesystem.py
def __init__(self, model: str = "gpt-4o"):
    """Initialize filesystem gatherer.

    Args:
        model: LLM model name for ReAct agent
    """
    self._model = model

gather(claim, context) async

Gather filesystem evidence for a claim.

Parameters:

Name Type Description Default
claim Claim

The claim to find evidence for

required
context dict[str, Any]

Workflow context containing root_path

required

Returns:

Type Description
list[Evidence]

List of Evidence objects from filesystem sources

Source code in src/truthfulness_evaluator/strategies/gatherers/filesystem.py
async def gather(self, claim: Claim, context: dict[str, Any]) -> list[Evidence]:
    """Gather filesystem evidence for a claim.

    Args:
        claim: The claim to find evidence for
        context: Workflow context containing root_path

    Returns:
        List of Evidence objects from filesystem sources
    """
    root_path = context.get("root_path")
    if not root_path:
        logger.warning("No root_path in context, skipping filesystem search")
        return []

    agent = FilesystemEvidenceAgent(root_path=root_path, model=self._model)
    raw_results = await agent.search(claim.text)

    evidence = []
    for item in raw_results:
        evidence.append(
            Evidence(
                source=item.get("file_path", "unknown"),
                source_type="filesystem",
                content=item.get("content", "")[:1000],
                relevance_score=item.get("relevance", 0.5),
                supports_claim=item.get("supports"),
            )
        )

    return evidence

CompositeGatherer

truthfulness_evaluator.strategies.gatherers.CompositeGatherer

Runs multiple evidence gatherers in parallel and combines results.

Source code in src/truthfulness_evaluator/strategies/gatherers/composite.py
class CompositeGatherer:
    """Runs multiple evidence gatherers in parallel and combines results."""

    def __init__(
        self,
        gatherers: list[EvidenceGatherer],
        *,
        max_total_evidence: int = 10,
        deduplicate: bool = True,
    ):
        """Initialize composite gatherer.

        Args:
            gatherers: List of gatherer instances to run in parallel
            max_total_evidence: Maximum total evidence items to return
            deduplicate: Whether to remove duplicate evidence by source
        """
        self._gatherers = gatherers
        self._max_total_evidence = max_total_evidence
        self._deduplicate = deduplicate

    async def gather(self, claim: Claim, context: dict[str, Any]) -> list[Evidence]:
        """Gather evidence from all configured gatherers in parallel.

        Args:
            claim: The claim to find evidence for
            context: Workflow context passed to all gatherers

        Returns:
            Combined, deduplicated, and sorted evidence list
        """
        tasks = [gatherer.gather(claim, context) for gatherer in self._gatherers]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        all_evidence: list[Evidence] = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logger.warning(f"Gatherer {self._gatherers[i].__class__.__name__} failed: {result}")
                continue
            all_evidence.extend(result)

        if self._deduplicate:
            all_evidence = self._dedupe(all_evidence)

        all_evidence.sort(key=lambda e: e.relevance_score, reverse=True)

        return all_evidence[: self._max_total_evidence]

    @staticmethod
    def _dedupe(evidence_list: list[Evidence]) -> list[Evidence]:
        """Remove duplicate evidence by source field.

        Args:
            evidence_list: List of evidence items to deduplicate

        Returns:
            Deduplicated list (preserves first occurrence)
        """
        seen = set()
        deduped = []
        for evidence in evidence_list:
            if evidence.source not in seen:
                seen.add(evidence.source)
                deduped.append(evidence)
        return deduped

__init__(gatherers, *, max_total_evidence=10, deduplicate=True)

Initialize composite gatherer.

Parameters:

Name Type Description Default
gatherers list[EvidenceGatherer]

List of gatherer instances to run in parallel

required
max_total_evidence int

Maximum total evidence items to return

10
deduplicate bool

Whether to remove duplicate evidence by source

True
Source code in src/truthfulness_evaluator/strategies/gatherers/composite.py
def __init__(
    self,
    gatherers: list[EvidenceGatherer],
    *,
    max_total_evidence: int = 10,
    deduplicate: bool = True,
):
    """Initialize composite gatherer.

    Args:
        gatherers: List of gatherer instances to run in parallel
        max_total_evidence: Maximum total evidence items to return
        deduplicate: Whether to remove duplicate evidence by source
    """
    self._gatherers = gatherers
    self._max_total_evidence = max_total_evidence
    self._deduplicate = deduplicate

gather(claim, context) async

Gather evidence from all configured gatherers in parallel.

Parameters:

Name Type Description Default
claim Claim

The claim to find evidence for

required
context dict[str, Any]

Workflow context passed to all gatherers

required

Returns:

Type Description
list[Evidence]

Combined, deduplicated, and sorted evidence list

Source code in src/truthfulness_evaluator/strategies/gatherers/composite.py
async def gather(self, claim: Claim, context: dict[str, Any]) -> list[Evidence]:
    """Gather evidence from all configured gatherers in parallel.

    Args:
        claim: The claim to find evidence for
        context: Workflow context passed to all gatherers

    Returns:
        Combined, deduplicated, and sorted evidence list
    """
    tasks = [gatherer.gather(claim, context) for gatherer in self._gatherers]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    all_evidence: list[Evidence] = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            logger.warning(f"Gatherer {self._gatherers[i].__class__.__name__} failed: {result}")
            continue
        all_evidence.extend(result)

    if self._deduplicate:
        all_evidence = self._dedupe(all_evidence)

    all_evidence.sort(key=lambda e: e.relevance_score, reverse=True)

    return all_evidence[: self._max_total_evidence]

Verifiers

Claim verification strategies implement the ClaimVerifier protocol.

SingleModelVerifier

truthfulness_evaluator.strategies.verifiers.SingleModelVerifier

Adapter for single-model claim verification using VerificationChain.

Source code in src/truthfulness_evaluator/strategies/verifiers/single_model.py
class SingleModelVerifier:
    """Adapter for single-model claim verification using VerificationChain."""

    def __init__(self, model: str = "gpt-4o"):
        self._chain = VerificationChain(model_name=model)

    async def verify(self, claim: Claim, evidence: list[Evidence]) -> VerificationResult:
        """Verify a claim against evidence using a single LLM."""
        return await self._chain.verify(claim, evidence)

verify(claim, evidence) async

Verify a claim against evidence using a single LLM.

Source code in src/truthfulness_evaluator/strategies/verifiers/single_model.py
async def verify(self, claim: Claim, evidence: list[Evidence]) -> VerificationResult:
    """Verify a claim against evidence using a single LLM."""
    return await self._chain.verify(claim, evidence)

ConsensusVerifier

truthfulness_evaluator.strategies.verifiers.ConsensusVerifier

Adapter for multi-model consensus verification using ConsensusChain.

Source code in src/truthfulness_evaluator/strategies/verifiers/consensus.py
class ConsensusVerifier:
    """Adapter for multi-model consensus verification using ConsensusChain."""

    def __init__(
        self,
        models: list[str],
        *,
        weights: dict[str, float] | None = None,
        confidence_threshold: float = 0.7,
    ):
        self._chain = ConsensusChain(
            model_names=models,
            weights=weights,
            confidence_threshold=confidence_threshold,
        )

    async def verify(self, claim: Claim, evidence: list[Evidence]) -> VerificationResult:
        """Verify a claim using multi-model consensus."""
        return await self._chain.verify(claim, evidence)

verify(claim, evidence) async

Verify a claim using multi-model consensus.

Source code in src/truthfulness_evaluator/strategies/verifiers/consensus.py
async def verify(self, claim: Claim, evidence: list[Evidence]) -> VerificationResult:
    """Verify a claim using multi-model consensus."""
    return await self._chain.verify(claim, evidence)

ICE Consensus Status

The iterative critique/revise rounds for ICE (Iterative Consensus Ensemble) are currently stubbed. The implementation uses weighted voting but does not yet perform full debate-style refinement.

InternalVerifier

truthfulness_evaluator.strategies.verifiers.InternalVerifier

Adapter for internal/codebase verification using InternalVerificationChain.

Source code in src/truthfulness_evaluator/strategies/verifiers/internal.py
class InternalVerifier:
    """Adapter for internal/codebase verification using InternalVerificationChain."""

    def __init__(
        self,
        root_path: str,
        *,
        model: str = "gpt-4o",
        classification_model: str = "gpt-4o-mini",
    ):
        self._chain = InternalVerificationChain(root_path=root_path, model=model)
        self._classifier = ClaimClassifier(model=classification_model)

    async def verify(self, claim: Claim, evidence: list[Evidence]) -> VerificationResult:
        """Verify a claim against the codebase after classifying it."""
        try:
            classification = await self._classifier.classify(claim)

            if classification.claim_type in ("external_fact", "unknown"):
                return VerificationResult(
                    claim_id=claim.id,
                    verdict="NOT_ENOUGH_INFO",
                    confidence=0.0,
                    evidence=[],
                    explanation=(
                        f"Claim classified as '{classification.claim_type}' -- "
                        "not suitable for internal verification."
                    ),
                    model_votes={},
                )

            return await self._chain.verify(claim, classification)

        except Exception as e:
            logger.warning(f"Classification failed for claim {claim.id}: {e}")
            return VerificationResult(
                claim_id=claim.id,
                verdict="NOT_ENOUGH_INFO",
                confidence=0.0,
                evidence=[],
                explanation=f"Classification error: {str(e)}",
                model_votes={},
            )

verify(claim, evidence) async

Verify a claim against the codebase after classifying it.

Source code in src/truthfulness_evaluator/strategies/verifiers/internal.py
async def verify(self, claim: Claim, evidence: list[Evidence]) -> VerificationResult:
    """Verify a claim against the codebase after classifying it."""
    try:
        classification = await self._classifier.classify(claim)

        if classification.claim_type in ("external_fact", "unknown"):
            return VerificationResult(
                claim_id=claim.id,
                verdict="NOT_ENOUGH_INFO",
                confidence=0.0,
                evidence=[],
                explanation=(
                    f"Claim classified as '{classification.claim_type}' -- "
                    "not suitable for internal verification."
                ),
                model_votes={},
            )

        return await self._chain.verify(claim, classification)

    except Exception as e:
        logger.warning(f"Classification failed for claim {claim.id}: {e}")
        return VerificationResult(
            claim_id=claim.id,
            verdict="NOT_ENOUGH_INFO",
            confidence=0.0,
            evidence=[],
            explanation=f"Classification error: {str(e)}",
            model_votes={},
        )

Formatters

Report formatting strategies implement the ReportFormatter protocol.

JsonFormatter

truthfulness_evaluator.strategies.formatters.JsonFormatter

Formats truthfulness reports as JSON.

Source code in src/truthfulness_evaluator/strategies/formatters/json_fmt.py
class JsonFormatter:
    """Formats truthfulness reports as JSON."""

    def __init__(self, indent: int = 2):
        self._indent = indent

    def format(self, report: TruthfulnessReport) -> str:
        """Format a truthfulness report as JSON."""
        return report.model_dump_json(indent=self._indent)

    def file_extension(self) -> str:
        """Return the file extension for JSON format."""
        return ".json"

file_extension()

Return the file extension for JSON format.

Source code in src/truthfulness_evaluator/strategies/formatters/json_fmt.py
def file_extension(self) -> str:
    """Return the file extension for JSON format."""
    return ".json"

format(report)

Format a truthfulness report as JSON.

Source code in src/truthfulness_evaluator/strategies/formatters/json_fmt.py
def format(self, report: TruthfulnessReport) -> str:
    """Format a truthfulness report as JSON."""
    return report.model_dump_json(indent=self._indent)

MarkdownFormatter

truthfulness_evaluator.strategies.formatters.MarkdownFormatter

Formats truthfulness reports as Markdown.

Source code in src/truthfulness_evaluator/strategies/formatters/markdown.py
class MarkdownFormatter:
    """Formats truthfulness reports as Markdown."""

    def format(self, report: TruthfulnessReport) -> str:
        """Format a truthfulness report as Markdown."""
        from ...reporting.generator import ReportGenerator

        gen = ReportGenerator(report)
        return gen.to_markdown()

    def file_extension(self) -> str:
        """Return the file extension for Markdown format."""
        return ".md"

file_extension()

Return the file extension for Markdown format.

Source code in src/truthfulness_evaluator/strategies/formatters/markdown.py
def file_extension(self) -> str:
    """Return the file extension for Markdown format."""
    return ".md"

format(report)

Format a truthfulness report as Markdown.

Source code in src/truthfulness_evaluator/strategies/formatters/markdown.py
def format(self, report: TruthfulnessReport) -> str:
    """Format a truthfulness report as Markdown."""
    from ...reporting.generator import ReportGenerator

    gen = ReportGenerator(report)
    return gen.to_markdown()

HtmlFormatter

truthfulness_evaluator.strategies.formatters.HtmlFormatter

Formats truthfulness reports as HTML.

Source code in src/truthfulness_evaluator/strategies/formatters/html.py
class HtmlFormatter:
    """Formats truthfulness reports as HTML."""

    def format(self, report: TruthfulnessReport) -> str:
        """Format a truthfulness report as HTML."""
        from ...reporting.generator import ReportGenerator

        gen = ReportGenerator(report)
        return gen.to_html()

    def file_extension(self) -> str:
        """Return the file extension for HTML format."""
        return ".html"

file_extension()

Return the file extension for HTML format.

Source code in src/truthfulness_evaluator/strategies/formatters/html.py
def file_extension(self) -> str:
    """Return the file extension for HTML format."""
    return ".html"

format(report)

Format a truthfulness report as HTML.

Source code in src/truthfulness_evaluator/strategies/formatters/html.py
def format(self, report: TruthfulnessReport) -> str:
    """Format a truthfulness report as HTML."""
    from ...reporting.generator import ReportGenerator

    gen = ReportGenerator(report)
    return gen.to_html()