vCon Adapter Development Guide

A comprehensive guide for building adapters that convert conversation data from various systems into the standardized vCon format using the vCon Python library.

Table of Contents

Introduction to vCon Adapters

vCon adapters are specialized components that transform conversation data from various sources (call centers, chat systems, video conferencing platforms, etc.) into the standardized vCon format. This enables interoperability between different conversation systems and provides a unified way to store, analyze, and exchange conversation data.

What vCon Adapters Do

  • Extract conversation data from source systems

  • Transform data into vCon-compliant structures

  • Validate the resulting vCon objects

  • Export vCons for storage or further processing

Common Use Cases

  • Call Center Integration: Convert PBX call records to vCons

  • Chat Platform Export: Transform Slack/Teams conversations

  • Video Conference Processing: Convert Zoom/WebEx recordings

  • Email Threading: Convert email chains to conversation format

  • Social Media Monitoring: Transform social interactions

Adapter Architecture

Basic Adapter Structure

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from vcon import Vcon, Party, Dialog
from datetime import datetime

class BaseVconAdapter(ABC):
    """Base class for all vCon adapters."""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.validation_errors = []
    
    @abstractmethod
    def extract_data(self, source: Any) -> Dict[str, Any]:
        """Extract raw data from the source system."""
        pass
    
    @abstractmethod
    def transform_to_vcon(self, raw_data: Dict[str, Any]) -> Vcon:
        """Transform raw data into a vCon object."""
        pass
    
    def validate_vcon(self, vcon: Vcon) -> bool:
        """Validate the generated vCon."""
        is_valid, errors = vcon.is_valid()
        self.validation_errors = errors
        return is_valid
    
    def process(self, source: Any) -> Vcon:
        """Main processing pipeline."""
        raw_data = self.extract_data(source)
        vcon = self.transform_to_vcon(raw_data)
        
        if not self.validate_vcon(vcon):
            raise ValueError(f"Invalid vCon generated: {self.validation_errors}")
        
        return vcon

Modular Design Pattern

class VconAdapterPipeline:
    """Pipeline for processing conversation data through multiple stages."""
    
    def __init__(self):
        self.extractors = []
        self.transformers = []
        self.validators = []
        self.exporters = []
    
    def add_extractor(self, extractor):
        self.extractors.append(extractor)
    
    def add_transformer(self, transformer):
        self.transformers.append(transformer)
    
    def process_conversation(self, source_data):
        # Extract
        for extractor in self.extractors:
            source_data = extractor.process(source_data)
        
        # Transform
        vcon = Vcon.build_new()
        for transformer in self.transformers:
            vcon = transformer.apply(vcon, source_data)
        
        # Validate
        for validator in self.validators:
            validator.validate(vcon)
        
        return vcon

Getting Started

Installation and Setup

# Install the vCon library
pip install vcon

# Basic adapter setup
from vcon import Vcon, Party, Dialog
import json
from datetime import datetime

Simple Adapter Example

class ChatLogAdapter(BaseVconAdapter):
    """Adapter for converting chat logs to vCon format."""
    
    def extract_data(self, chat_file_path: str) -> Dict[str, Any]:
        """Extract data from a chat log file."""
        with open(chat_file_path, 'r') as f:
            return json.load(f)
    
    def transform_to_vcon(self, raw_data: Dict[str, Any]) -> Vcon:
        """Transform chat data to vCon."""
        vcon = Vcon.build_new()
        
        # Add metadata
        vcon.add_tag("source", "chat_log")
        vcon.add_tag("platform", raw_data.get("platform", "unknown"))
        
        # Process participants
        participant_map = {}
        for i, participant in enumerate(raw_data.get("participants", [])):
            party = Party(
                name=participant["name"],
                mailto=participant.get("email"),
                role=participant.get("role", "participant")
            )
            vcon.add_party(party)
            participant_map[participant["id"]] = i
        
        # Process messages
        for message in raw_data.get("messages", []):
            dialog = Dialog(
                type="text",
                start=datetime.fromisoformat(message["timestamp"]),
                parties=[participant_map[message["sender_id"]]],
                originator=participant_map[message["sender_id"]],
                mimetype="text/plain",
                body=message["content"]
            )
            vcon.add_dialog(dialog)
        
        return vcon

# Usage
adapter = ChatLogAdapter({})
vcon = adapter.process("chat_log.json")
vcon.save_to_file("conversation.vcon.json")

Core Adapter Components

1. Data Extractors

class CallRecordExtractor:
    """Extract data from call center records."""
    
    def __init__(self, api_client):
        self.api_client = api_client
    
    def extract_call_data(self, call_id: str) -> Dict[str, Any]:
        """Extract call data from the system."""
        call_record = self.api_client.get_call(call_id)
        recording_url = self.api_client.get_recording_url(call_id)
        transcript = self.api_client.get_transcript(call_id)
        
        return {
            "call_record": call_record,
            "recording_url": recording_url,
            "transcript": transcript,
            "metadata": self.api_client.get_call_metadata(call_id)
        }

class EmailThreadExtractor:
    """Extract data from email threads."""
    
    def extract_thread(self, thread_id: str) -> Dict[str, Any]:
        """Extract email thread data."""
        # Implementation for email extraction
        pass

2. Party Mappers

class PartyMapper:
    """Maps source system participants to vCon parties."""
    
    def __init__(self, mapping_rules: Dict[str, str]):
        self.mapping_rules = mapping_rules
    
    def map_party(self, source_participant: Dict[str, Any]) -> Party:
        """Map a source participant to a vCon Party."""
        return Party(
            name=source_participant.get(self.mapping_rules.get("name", "name")),
            tel=source_participant.get(self.mapping_rules.get("phone", "phone")),
            mailto=source_participant.get(self.mapping_rules.get("email", "email")),
            role=self.determine_role(source_participant),
            uuid=source_participant.get("id")
        )
    
    def determine_role(self, participant: Dict[str, Any]) -> str:
        """Determine the role of a participant."""
        if participant.get("is_agent"):
            return "agent"
        elif participant.get("is_customer"):
            return "customer"
        else:
            return "participant"

3. Dialog Processors

class DialogProcessor:
    """Process different types of dialog content."""
    
    def process_text_message(self, message: Dict[str, Any], parties_map: Dict) -> Dialog:
        """Process a text message into a Dialog."""
        return Dialog(
            type="text",
            start=self.parse_timestamp(message["timestamp"]),
            parties=self.get_involved_parties(message, parties_map),
            originator=parties_map[message["sender_id"]],
            mimetype="text/plain",
            body=message["content"]
        )
    
    def process_audio_recording(self, recording: Dict[str, Any], parties_map: Dict) -> Dialog:
        """Process an audio recording into a Dialog."""
        return Dialog(
            type="recording",
            start=self.parse_timestamp(recording["start_time"]),
            parties=list(parties_map.values()),
            mimetype="audio/wav",
            url=recording["url"],
            duration=recording.get("duration")
        )
    
    def process_video_call(self, video_data: Dict[str, Any], parties_map: Dict) -> Dialog:
        """Process a video call into a Dialog."""
        return Dialog(
            type="video",
            start=self.parse_timestamp(video_data["start_time"]),
            parties=list(parties_map.values()),
            mimetype="video/mp4",
            url=video_data["recording_url"],
            resolution=video_data.get("resolution", "1920x1080"),
            duration=video_data.get("duration")
        )

Data Mapping Strategies

1. Configuration-Driven Mapping

class ConfigurableMappingAdapter(BaseVconAdapter):
    """Adapter with configurable field mappings."""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.field_mappings = config.get("field_mappings", {})
        self.party_mappings = config.get("party_mappings", {})
        self.dialog_mappings = config.get("dialog_mappings", {})
    
    def map_field(self, source_data: Dict, mapping_path: str, default=None):
        """Map a field from source data using configured path."""
        keys = mapping_path.split(".")
        value = source_data
        
        try:
            for key in keys:
                value = value[key]
            return value
        except (KeyError, TypeError):
            return default

# Configuration example
mapping_config = {
    "field_mappings": {
        "conversation_id": "call.id",
        "start_time": "call.started_at",
        "end_time": "call.ended_at"
    },
    "party_mappings": {
        "name": "participant.display_name",
        "phone": "participant.phone_number",
        "email": "participant.email_address"
    },
    "dialog_mappings": {
        "timestamp": "message.created_at",
        "content": "message.body",
        "sender": "message.from"
    }
}

2. Schema-Based Transformation

from jsonschema import validate

class SchemaBasedAdapter(BaseVconAdapter):
    """Adapter that validates input against a schema before transformation."""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.input_schema = config["input_schema"]
        self.transformation_rules = config["transformation_rules"]
    
    def extract_data(self, source: Dict[str, Any]) -> Dict[str, Any]:
        """Extract and validate data against schema."""
        validate(source, self.input_schema)
        return source
    
    def apply_transformation_rules(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Apply transformation rules to normalize data."""
        transformed = {}
        
        for target_field, rule in self.transformation_rules.items():
            if rule["type"] == "direct_map":
                transformed[target_field] = data.get(rule["source_field"])
            elif rule["type"] == "function":
                transformed[target_field] = self.apply_function(
                    rule["function"], 
                    data.get(rule["source_field"])
                )
        
        return transformed

Media Handling

1. Audio Processing

class AudioMediaHandler:
    """Handle audio content in conversations."""
    
    def process_audio_dialog(self, audio_data: Dict[str, Any]) -> Dialog:
        """Process audio data into a vCon dialog."""
        if audio_data.get("is_external"):
            return self.create_external_audio_dialog(audio_data)
        else:
            return self.create_inline_audio_dialog(audio_data)
    
    def create_external_audio_dialog(self, audio_data: Dict[str, Any]) -> Dialog:
        """Create dialog with external audio reference."""
        return Dialog(
            type="recording",
            start=audio_data["start_time"],
            parties=audio_data["participants"],
            mimetype=self.detect_audio_mimetype(audio_data["url"]),
            url=audio_data["url"],
            duration=audio_data.get("duration"),
            content_hash=audio_data.get("checksum")
        )
    
    def create_inline_audio_dialog(self, audio_data: Dict[str, Any]) -> Dialog:
        """Create dialog with inline audio data."""
        import base64
        
        # Read audio file and encode
        with open(audio_data["file_path"], "rb") as f:
            audio_bytes = f.read()
        
        encoded_audio = base64.b64encode(audio_bytes).decode()
        
        return Dialog(
            type="recording",
            start=audio_data["start_time"],
            parties=audio_data["participants"],
            mimetype=self.detect_audio_mimetype(audio_data["file_path"]),
            body=encoded_audio,
            encoding="base64",
            filename=audio_data.get("filename")
        )

2. Video Processing

class VideoMediaHandler:
    """Handle video content in conversations."""
    
    def process_video_call(self, video_data: Dict[str, Any]) -> Dialog:
        """Process video call data."""
        dialog = Dialog(
            type="video",
            start=video_data["start_time"],
            parties=video_data["participants"],
            mimetype="video/mp4",
            url=video_data["recording_url"],
            resolution=video_data.get("resolution"),
            frame_rate=video_data.get("fps"),
            codec=video_data.get("codec")
        )
        
        # Add video metadata
        if video_data.get("has_screen_share"):
            dialog.metadata = dialog.metadata or {}
            dialog.metadata["screen_share"] = True
        
        return dialog
    
    def generate_thumbnail(self, video_dialog: Dialog) -> str:
        """Generate thumbnail for video dialog."""
        if hasattr(video_dialog, 'generate_thumbnail'):
            thumbnail_data = video_dialog.generate_thumbnail(
                timestamp=5.0,  # 5 seconds into video
                width=320,
                height=240
            )
            return base64.b64encode(thumbnail_data).decode()
        return None

Best Practices

1. Error Handling and Resilience

class ResilientAdapter(BaseVconAdapter):
    """Adapter with comprehensive error handling."""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.retry_attempts = config.get("retry_attempts", 3)
        self.error_handlers = {}
    
    def register_error_handler(self, error_type: type, handler):
        """Register custom error handlers."""
        self.error_handlers[error_type] = handler
    
    def safe_extract_data(self, source: Any) -> Dict[str, Any]:
        """Extract data with error handling and retries."""
        for attempt in range(self.retry_attempts):
            try:
                return self.extract_data(source)
            except Exception as e:
                if type(e) in self.error_handlers:
                    return self.error_handlers[type(e)](source, e, attempt)
                
                if attempt == self.retry_attempts - 1:
                    raise
                
                # Exponential backoff
                time.sleep(2 ** attempt)
    
    def partial_transform_with_fallbacks(self, raw_data: Dict[str, Any]) -> Vcon:
        """Transform data with fallbacks for missing fields."""
        vcon = Vcon.build_new()
        
        try:
            # Attempt full transformation
            return self.transform_to_vcon(raw_data)
        except Exception as e:
            # Fall back to partial transformation
            return self.create_minimal_vcon(raw_data, e)
    
    def create_minimal_vcon(self, raw_data: Dict[str, Any], error: Exception) -> Vcon:
        """Create minimal vCon when full transformation fails."""
        vcon = Vcon.build_new()
        
        # Add error information
        vcon.add_tag("transformation_error", str(error))
        vcon.add_tag("partial_conversion", "true")
        
        # Add raw data as attachment if possible
        try:
            vcon.add_attachment(
                type="raw_source_data",
                body=json.dumps(raw_data),
                encoding="json"
            )
        except:
            pass  # Silently fail if raw data can't be serialized
        
        return vcon

2. Performance Optimization

class PerformantAdapter(BaseVconAdapter):
    """Adapter optimized for performance."""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.batch_size = config.get("batch_size", 100)
        self.use_threading = config.get("use_threading", False)
        self.cache_enabled = config.get("cache_enabled", True)
        self._cache = {} if self.cache_enabled else None
    
    def process_batch(self, sources: List[Any]) -> List[Vcon]:
        """Process multiple conversations in batch."""
        if self.use_threading:
            return self._process_threaded_batch(sources)
        else:
            return [self.process(source) for source in sources]
    
    def _process_threaded_batch(self, sources: List[Any]) -> List[Vcon]:
        """Process batch using threading for I/O bound operations."""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        vcons = []
        with ThreadPoolExecutor(max_workers=5) as executor:
            future_to_source = {
                executor.submit(self.process, source): source 
                for source in sources
            }
            
            for future in as_completed(future_to_source):
                try:
                    vcon = future.result()
                    vcons.append(vcon)
                except Exception as e:
                    source = future_to_source[future]
                    print(f"Error processing {source}: {e}")
        
        return vcons
    
    def cached_lookup(self, key: str, fetch_func):
        """Cache expensive lookups."""
        if not self.cache_enabled:
            return fetch_func()
        
        if key not in self._cache:
            self._cache[key] = fetch_func()
        
        return self._cache[key]

3. Logging and Monitoring

import logging
from datetime import datetime

class MonitoredAdapter(BaseVconAdapter):
    """Adapter with comprehensive logging and monitoring."""
    
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.logger = logging.getLogger(f"{self.__class__.__name__}")
        self.metrics = {
            "processed_count": 0,
            "error_count": 0,
            "start_time": datetime.now()
        }
    
    def process(self, source: Any) -> Vcon:
        """Process with monitoring."""
        start_time = datetime.now()
        
        try:
            self.logger.info(f"Starting processing for source: {source}")
            vcon = super().process(source)
            
            # Update metrics
            self.metrics["processed_count"] += 1
            processing_time = (datetime.now() - start_time).total_seconds()
            
            self.logger.info(
                f"Successfully processed source in {processing_time:.2f}s"
            )
            
            return vcon
            
        except Exception as e:
            self.metrics["error_count"] += 1
            self.logger.error(f"Failed to process source: {e}", exc_info=True)
            raise
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get adapter performance metrics."""
        runtime = (datetime.now() - self.metrics["start_time"]).total_seconds()
        
        return {
            **self.metrics,
            "runtime_seconds": runtime,
            "success_rate": (
                self.metrics["processed_count"] / 
                (self.metrics["processed_count"] + self.metrics["error_count"])
                if (self.metrics["processed_count"] + self.metrics["error_count"]) > 0
                else 0
            )
        }

Testing Your Adapter

1. Unit Testing Framework

import unittest
from unittest.mock import Mock, patch
import tempfile
import json

class TestChatLogAdapter(unittest.TestCase):
    """Test suite for ChatLogAdapter."""
    
    def setUp(self):
        """Set up test fixtures."""
        self.adapter = ChatLogAdapter({})
        
        self.sample_data = {
            "platform": "slack",
            "participants": [
                {"id": "user1", "name": "Alice", "email": "[email protected]"},
                {"id": "user2", "name": "Bob", "email": "[email protected]"}
            ],
            "messages": [
                {
                    "sender_id": "user1",
                    "timestamp": "2023-01-01T10:00:00Z",
                    "content": "Hello!"
                },
                {
                    "sender_id": "user2",
                    "timestamp": "2023-01-01T10:01:00Z",
                    "content": "Hi there!"
                }
            ]
        }
    
    def test_extract_data(self):
        """Test data extraction from file."""
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(self.sample_data, f)
            temp_path = f.name
        
        try:
            extracted = self.adapter.extract_data(temp_path)
            self.assertEqual(extracted, self.sample_data)
        finally:
            os.unlink(temp_path)
    
    def test_transform_to_vcon(self):
        """Test transformation to vCon format."""
        vcon = self.adapter.transform_to_vcon(self.sample_data)
        
        # Verify basic structure
        self.assertEqual(len(vcon.parties), 2)
        self.assertEqual(len(vcon.dialog), 2)
        
        # Verify parties
        self.assertEqual(vcon.parties[0].name, "Alice")
        self.assertEqual(vcon.parties[1].name, "Bob")
        
        # Verify dialogs
        self.assertEqual(vcon.dialog[0]["type"], "text")
        self.assertEqual(vcon.dialog[0]["body"], "Hello!")
    
    def test_invalid_data_handling(self):
        """Test handling of invalid input data."""
        invalid_data = {"invalid": "structure"}
        
        with self.assertRaises(KeyError):
            self.adapter.transform_to_vcon(invalid_data)
    
    def test_vcon_validation(self):
        """Test that generated vCons are valid."""
        vcon = self.adapter.transform_to_vcon(self.sample_data)
        is_valid, errors = vcon.is_valid()
        
        self.assertTrue(is_valid, f"vCon validation failed: {errors}")

class TestAdapterIntegration(unittest.TestCase):
    """Integration tests for adapter functionality."""
    
    def test_end_to_end_processing(self):
        """Test complete processing pipeline."""
        # Create test data
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(sample_chat_data, f)
            input_path = f.name
        
        try:
            # Process through adapter
            adapter = ChatLogAdapter({})
            vcon = adapter.process(input_path)
            
            # Save and reload to test serialization
            with tempfile.NamedTemporaryFile(mode='w', suffix='.vcon.json', delete=False) as f:
                output_path = f.name
            
            vcon.save_to_file(output_path)
            reloaded_vcon = Vcon.load_from_file(output_path)
            
            # Verify roundtrip
            self.assertEqual(vcon.to_json(), reloaded_vcon.to_json())
            
        finally:
            os.unlink(input_path)
            os.unlink(output_path)

2. Property-Based Testing

from hypothesis import given, strategies as st

class TestAdapterProperties(unittest.TestCase):
    """Property-based tests for adapter behavior."""
    
    @given(st.dictionaries(
        st.text(min_size=1),
        st.one_of(st.text(), st.integers(), st.floats()),
        min_size=1
    ))
    def test_adapter_handles_arbitrary_metadata(self, metadata):
        """Test that adapter handles arbitrary metadata gracefully."""
        adapter = MetadataAdapter({})
        
        try:
            # Should not crash on arbitrary input
            result = adapter.process_metadata(metadata)
            # Result should be valid JSON serializable
            json.dumps(result)
        except Exception as e:
            # If it fails, it should fail gracefully
            self.assertIsInstance(e, (ValueError, TypeError))
    
    @given(st.lists(
        st.dictionaries(
            st.sampled_from(["name", "email", "phone"]),
            st.text(min_size=1),
            min_size=1
        ),
        min_size=1,
        max_size=10
    ))
    def test_party_mapping_preserves_count(self, participants):
        """Test that party mapping preserves participant count."""
        adapter = PartyMappingAdapter({})
        mapped_parties = adapter.map_participants(participants)
        
        self.assertEqual(len(participants), len(mapped_parties))

Common Patterns

1. Multi-Source Adapter

class MultiSourceAdapter:
    """Adapter that combines data from multiple sources."""
    
    def __init__(self, source_adapters: Dict[str, BaseVconAdapter]):
        self.source_adapters = source_adapters
    
    def process_combined(self, sources: Dict[str, Any]) -> Vcon:
        """Process data from multiple sources into a single vCon."""
        base_vcon = Vcon.build_new()
        
        # Process each source
        for source_name, source_data in sources.items():
            adapter = self.source_adapters[source_name]
            source_vcon = adapter.process(source_data)
            
            # Merge into base vCon
            base_vcon = self.merge_vcons(base_vcon, source_vcon, source_name)
        
        return base_vcon
    
    def merge_vcons(self, base: Vcon, source: Vcon, source_name: str) -> Vcon:
        """Merge source vCon into base vCon."""
        # Merge parties (avoiding duplicates)
        party_offset = len(base.parties)
        for party in source.parties:
            base.add_party(party)
        
        # Merge dialogs (adjusting party references)
        for dialog_dict in source.dialog:
            # Adjust party references
            if "parties" in dialog_dict:
                dialog_dict["parties"] = [
                    p + party_offset for p in dialog_dict["parties"]
                ]
            if "originator" in dialog_dict:
                dialog_dict["originator"] += party_offset
            
            # Add source tag
            dialog = Dialog(**dialog_dict)
            dialog.metadata = dialog.metadata or {}
            dialog.metadata["source"] = source_name
            
            base.add_dialog(dialog)
        
        return base

2. Streaming Adapter

class StreamingAdapter:
    """Adapter for processing streaming conversation data."""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.conversation_buffer = {}
        self.buffer_timeout = config.get("buffer_timeout", 300)  # 5 minutes
    
    def process_stream_event(self, event: Dict[str, Any]) -> Optional[Vcon]:
        """Process a single streaming event."""
        conversation_id = event.get("conversation_id")
        
        if conversation_id not in self.conversation_buffer:
            self.conversation_buffer[conversation_id] = {
                "events": [],
                "last_updated": datetime.now()
            }
        
        # Add event to buffer
        self.conversation_buffer[conversation_id]["events"].append(event)
        self.conversation_buffer[conversation_id]["last_updated"] = datetime.now()
        
        # Check if conversation is complete
        if self.is_conversation_complete(conversation_id, event):
            return self.finalize_conversation(conversation_id)
        
        return None
    
    def is_conversation_complete(self, conversation_id: str, event: Dict[str, Any]) -> bool:
        """Determine if a conversation is complete."""
        return event.get("type") == "conversation_ended"
    
    def finalize_conversation(self, conversation_id: str) -> Vcon:
        """Convert buffered events to a vCon."""
        events = self.conversation_buffer[conversation_id]["events"]
        
        # Build vCon from events
        vcon = Vcon.build_new()
        vcon.add_tag("conversation_id", conversation_id)
        
        # Process events in chronological order
        for event in sorted(events, key=lambda e: e.get("timestamp", "")):
            self.process_event_to_vcon(vcon, event)
        
        # Clean up buffer
        del self.conversation_buffer[conversation_id]
        
        return vcon
    
    def cleanup_stale_conversations(self):
        """Clean up conversations that have been buffered too long."""
        now = datetime.now()
        stale_conversations = [
            conv_id for conv_id, data in self.conversation_buffer.items()
            if (now - data["last_updated"]).seconds > self.buffer_timeout
        ]
        
        for conv_id in stale_conversations:
            # Force finalize stale conversations
            self.finalize_conversation(conv_id)

3. Incremental Adapter

class IncrementalAdapter:
    """Adapter that processes conversations incrementally."""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.checkpoint_manager = CheckpointManager(config.get("checkpoint_file"))
    
    def process_incremental(self, source_iterator) -> Iterator[Vcon]:
        """Process conversations incrementally with checkpointing."""
        last_checkpoint = self.checkpoint_manager.get_last_checkpoint()
        
        for item in source_iterator:
            # Skip items already processed
            if self.is_before_checkpoint(item, last_checkpoint):
                continue
            
            try:
                vcon = self.process_item(item)
                yield vcon
                
                # Update checkpoint
                self.checkpoint_manager.update_checkpoint(
                    self.get_item_checkpoint(item)
                )
                
            except Exception as e:
                # Log error and continue
                self.handle_processing_error(item, e)
    
    def process_item(self, item: Any) -> Vcon:
        """Process a single item to vCon."""
        # Implementation specific to source format
        pass
    
    def is_before_checkpoint(self, item: Any, checkpoint: Any) -> bool:
        """Check if item was processed in previous run."""
        # Implementation specific to source format
        pass

class CheckpointManager:
    """Manage processing checkpoints for incremental processing."""
    
    def __init__(self, checkpoint_file: str):
        self.checkpoint_file = checkpoint_file
    
    def get_last_checkpoint(self) -> Any:
        """Get the last processing checkpoint."""
        try:
            with open(self.checkpoint_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return None
    
    def update_checkpoint(self, checkpoint: Any):
        """Update the processing checkpoint."""
        with open(self.checkpoint_file, 'w') as f:
            json.dump(checkpoint, f)

Troubleshooting

Common Issues and Solutions

1. Invalid vCon Generation

Problem: Generated vCons fail validation

Solutions:

def debug_vcon_validation(vcon: Vcon):
    """Debug vCon validation issues."""
    is_valid, errors = vcon.is_valid()
    
    if not is_valid:
        print("Validation errors:")
        for error in errors:
            print(f"  - {error}")
        
        # Check specific common issues
        if not vcon.parties:
            print("  Issue: No parties defined")
        
        if not vcon.dialog:
            print("  Issue: No dialog entries")
        
        for i, dialog in enumerate(vcon.dialog):
            if "parties" in dialog:
                max_party_idx = max(dialog["parties"]) if dialog["parties"] else -1
                if max_party_idx >= len(vcon.parties):
                    print(f"  Issue: Dialog {i} references invalid party index {max_party_idx}")

2. Memory Issues with Large Conversations

Problem: Large conversations cause memory issues

Solutions:

class StreamingVconWriter:
    """Write vCons in streaming fashion to handle large conversations."""
    
    def __init__(self, output_file: str):
        self.output_file = output_file
        self.base_vcon = None
    
    def initialize_vcon(self, metadata: Dict[str, Any]):
        """Initialize base vCon structure."""
        self.base_vcon = Vcon.build_new()
        # Add metadata, parties, etc.
    
    def add_dialog_batch(self, dialogs: List[Dialog]):
        """Add dialogs in batches to manage memory."""
        for dialog in dialogs:
            self.base_vcon.add_dialog(dialog)
        
        # Periodically flush to disk if needed
        if len(self.base_vcon.dialog) > 1000:
            self.flush_to_disk()
    
    def flush_to_disk(self):
        """Flush current vCon to disk."""
        self.base_vcon.save_to_file(self.output_file)

3. Character Encoding Issues

Problem: Non-UTF-8 characters in source data

Solutions:

def safe_decode_text(raw_text: bytes, fallback_encoding: str = "latin-1") -> str:
    """Safely decode text with fallback encoding."""
    try:
        return raw_text.decode("utf-8")
    except UnicodeDecodeError:
        try:
            return raw_text.decode(fallback_encoding)
        except UnicodeDecodeError:
            # Last resort: replace invalid characters
            return raw_text.decode("utf-8", errors="replace")

def sanitize_content(content: str) -> str:
    """Sanitize content for vCon compatibility."""
    # Remove null bytes and other problematic characters
    content = content.replace("\x00", "")
    
    # Normalize unicode
    import unicodedata
    content = unicodedata.normalize("NFKC", content)
    
    return content

Testing and Validation Tools

class AdapterTestSuite:
    """Comprehensive test suite for vCon adapters."""
    
    def __init__(self, adapter: BaseVconAdapter):
        self.adapter = adapter
    
    def run_full_test_suite(self, test_data: List[Any]) -> Dict[str, Any]:
        """Run complete test suite on adapter."""
        results = {
            "total_tests": len(test_data),
            "successful": 0,
            "failed": 0,
            "errors": []
        }
        
        for i, data in enumerate(test_data):
            try:
                vcon = self.adapter.process(data)
                
                # Validate vCon
                is_valid, errors = vcon.is_valid()
                if is_valid:
                    results["successful"] += 1
                else:
                    results["failed"] += 1
                    results["errors"].append(f"Test {i}: {errors}")
                    
            except Exception as e:
                results["failed"] += 1
                results["errors"].append(f"Test {i}: Exception {e}")
        
        return results
    
    def benchmark_adapter(self, test_data: List[Any]) -> Dict[str, float]:
        """Benchmark adapter performance."""
        import time
        
        start_time = time.time()
        
        for data in test_data:
            self.adapter.process(data)
        
        end_time = time.time()
        
        total_time = end_time - start_time
        avg_time = total_time / len(test_data)
        
        return {
            "total_time": total_time,
            "average_time_per_item": avg_time,
            "items_per_second": len(test_data) / total_time
        }

This guide provides a comprehensive foundation for building robust vCon adapters. The patterns and examples can be adapted for specific use cases and source systems while maintaining compliance with the vCon specification.

Last updated

Was this helpful?