LLM/Agentic AI Interview Questions - Hard

Hard-level LLM and Agentic AI interview questions covering multi-agent systems, production optimization, and advanced architectures.

Q1: Design a multi-agent system with agent communication and coordination.

Answer:

How Multi-Agent Systems Work:

Multiple specialized agents collaborate, communicate, and coordinate to solve complex tasks.

LangChain Implementation:

  1from langchain.agents import initialize_agent, AgentType, AgentExecutor
  2from langchain.agents import create_openai_functions_agent
  3from langchain.tools import Tool
  4from langchain.llms import OpenAI
  5from langchain.chat_models import ChatOpenAI
  6from langchain.chains import LLMChain, SequentialChain
  7from langchain.prompts import PromptTemplate, ChatPromptTemplate
  8from langchain.schema import HumanMessage, SystemMessage
  9from langchain.utilities import WikipediaAPIWrapper
 10from typing import Dict, List
 11import json
 12
 13# Define specialized tools for each agent
 14wikipedia = WikipediaAPIWrapper()
 15
 16def search_tool(query: str) -> str:
 17    """Search for information"""
 18    return wikipedia.run(query)
 19
 20def analyze_tool(data: str) -> str:
 21    """Analyze data and extract insights"""
 22    # This would use an analysis LLM chain
 23    return f"Analysis of: {data[:100]}..."
 24
 25def synthesize_tool(inputs: str) -> str:
 26    """Synthesize multiple inputs"""
 27    # This would use a synthesis LLM chain
 28    return f"Synthesized from: {inputs[:100]}..."
 29
 30# Create tools for each agent
 31research_tools = [
 32    Tool(
 33        name="search",
 34        func=search_tool,
 35        description="Search for information on a topic"
 36    )
 37]
 38
 39analysis_tools = [
 40    Tool(
 41        name="analyze",
 42        func=analyze_tool,
 43        description="Analyze data and extract insights"
 44    )
 45]
 46
 47synthesis_tools = [
 48    Tool(
 49        name="synthesize",
 50        func=synthesize_tool,
 51        description="Synthesize information from multiple sources"
 52    )
 53]
 54
 55# Initialize LLMs
 56llm = OpenAI(temperature=0)
 57chat_llm = ChatOpenAI(temperature=0)
 58
 59# Create specialized agents
 60research_agent = initialize_agent(
 61    tools=research_tools,
 62    llm=llm,
 63    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
 64    verbose=True,
 65    agent_kwargs={
 66        "prefix": "You are a research agent. Your job is to search for and gather information."
 67    }
 68)
 69
 70analysis_agent = initialize_agent(
 71    tools=analysis_tools,
 72    llm=llm,
 73    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
 74    verbose=True,
 75    agent_kwargs={
 76        "prefix": "You are an analysis agent. Your job is to analyze data and extract insights."
 77    }
 78)
 79
 80# Synthesis agent using LLMChain
 81synthesis_prompt = PromptTemplate(
 82    input_variables=["inputs"],
 83    template="""Synthesize the following information from multiple agents into a coherent response:
 84
 85{inputs}
 86
 87Create a comprehensive, well-structured answer that:
 881. Integrates all relevant information
 892. Resolves any contradictions
 903. Provides clear conclusions
 91
 92Synthesis:"""
 93)
 94
 95synthesis_chain = LLMChain(llm=llm, prompt=synthesis_prompt)
 96
 97# Orchestrator agent
 98class MultiAgentOrchestrator:
 99    def __init__(self, research_agent, analysis_agent, synthesis_chain, llm):
100        self.research_agent = research_agent
101        self.analysis_agent = analysis_agent
102        self.synthesis_chain = synthesis_chain
103        self.llm = llm
104    
105    def create_plan(self, query: str) -> List[Dict]:
106        """Use LLM to create execution plan"""
107        plan_prompt = f"""Given this query: "{query}"
108
109Available agents:
110- research_agent: Search and gather information
111- analysis_agent: Analyze data and extract insights
112- synthesis_chain: Synthesize multiple inputs
113
114Create an execution plan. Respond with JSON:
115{{
116    "steps": [
117        {{"agent": "research_agent", "task": "description"}},
118        {{"agent": "analysis_agent", "task": "description"}},
119        {{"agent": "synthesis_chain", "task": "description"}}
120    ]
121}}"""
122        
123        response = self.llm(plan_prompt)
124        try:
125            plan = json.loads(response)
126            return plan.get("steps", [])
127        except:
128            # Fallback plan
129            return [
130                {"agent": "research_agent", "task": query},
131                {"agent": "analysis_agent", "task": "analyze research results"},
132                {"agent": "synthesis_chain", "task": "synthesize all results"}
133            ]
134    
135    def execute(self, query: str) -> Dict:
136        """Execute multi-agent workflow"""
137        # Step 1: Create plan
138        plan = self.create_plan(query)
139        
140        results = {}
141        
142        # Step 2: Execute each agent
143        for step in plan:
144            agent_name = step["agent"]
145            task = step["task"]
146            
147            if agent_name == "research_agent":
148                result = self.research_agent.run(task)
149                results["research"] = result
150            
151            elif agent_name == "analysis_agent":
152                # Use research results if available
153                data = results.get("research", task)
154                result = self.analysis_agent.run(f"Analyze: {data}")
155                results["analysis"] = result
156            
157            elif agent_name == "synthesis_chain":
158                # Combine all results
159                inputs = "\n\n".join([
160                    f"{key}: {value}"
161                    for key, value in results.items()
162                ])
163                result = self.synthesis_chain.run(inputs=inputs)
164                results["synthesis"] = result
165        
166        return results
167
168# Usage
169orchestrator = MultiAgentOrchestrator(
170    research_agent=research_agent,
171    analysis_agent=analysis_agent,
172    synthesis_chain=synthesis_chain,
173    llm=llm
174)
175
176result = orchestrator.execute("What are the latest developments in quantum computing?")
177print(json.dumps(result, indent=2))
178
179# Alternative: Using SequentialChain for orchestration
180research_prompt = PromptTemplate(
181    input_variables=["query"],
182    template="Research the following query: {query}\n\nProvide comprehensive information:"
183)
184
185analysis_prompt = PromptTemplate(
186    input_variables=["research"],
187    template="Analyze the following research: {research}\n\nExtract key insights:"
188)
189
190synthesis_prompt = PromptTemplate(
191    input_variables=["research", "analysis"],
192    template="Synthesize:\nResearch: {research}\nAnalysis: {analysis}\n\nFinal answer:"
193)
194
195# Create chains
196research_chain = LLMChain(llm=llm, prompt=research_prompt, output_key="research")
197analysis_chain = LLMChain(llm=llm, prompt=analysis_prompt, output_key="analysis")
198synthesis_chain = LLMChain(llm=llm, prompt=synthesis_prompt, output_key="answer")
199
200# Sequential execution
201multi_agent_chain = SequentialChain(
202    chains=[research_chain, analysis_chain, synthesis_chain],
203    input_variables=["query"],
204    output_variables=["research", "analysis", "answer"],
205    verbose=True
206)
207
208result = multi_agent_chain({"query": "What are the latest developments in quantum computing?"})
209print(result["answer"])

Key Design Decisions:

  1. Message Bus: Decouples agents, enables async communication
  2. Orchestrator: Central coordinator for complex workflows
  3. Dependency Management: Agents can depend on others' results
  4. Async Execution: Parallel agent execution when possible
  5. Correlation IDs: Track request-response pairs

Benefits:

  • Specialization (each agent is expert in domain)
  • Scalability (add new agents easily)
  • Fault tolerance (one agent failure doesn't break system)
  • Flexibility (dynamic planning based on query)

Q2: Optimize LLM inference for production (latency, throughput, cost).

Answer:

Optimization Strategies:

1. Token Usage Optimization

LangChain Implementation:

  1class TokenOptimizer:
  2    def __init__(self, tokenizer, max_tokens: int = 4096):
  3        self.tokenizer = tokenizer
  4        self.max_tokens = max_tokens
  5    
  6    def compress_prompt(self, prompt: str, target_tokens: int) -> str:
  7        """Compress prompt to target token count"""
  8        tokens = self.tokenizer.encode(prompt)
  9        
 10        if len(tokens) <= target_tokens:
 11            return prompt
 12        
 13        # Strategy 1: Remove middle content (keep start and end)
 14        keep_start = target_tokens // 2
 15        keep_end = target_tokens - keep_start
 16        
 17        compressed_tokens = tokens[:keep_start] + tokens[-keep_end:]
 18        
 19        return self.tokenizer.decode(compressed_tokens)
 20    
 21    def summarize_context(self, context: str, llm, max_summary_tokens: int = 500) -> str:
 22        """Summarize long context"""
 23        tokens = self.tokenizer.encode(context)
 24        
 25        if len(tokens) <= max_summary_tokens:
 26            return context
 27        
 28        prompt = f"""Summarize the following in under {max_summary_tokens} tokens, preserving key information:
 29
 30{context}
 31
 32Summary:"""
 33        
 34        summary = llm.generate(prompt, max_tokens=max_summary_tokens)
 35        return summary
 36    
 37    def extract_relevant_chunks(self, context: str, query: str, 
 38                                embedding_model, max_chunks: int = 3) -> str:
 39        """Extract only relevant parts of context"""
 40        # Split into chunks
 41        chunks = context.split('\n\n')
 42        
 43        # Embed query and chunks
 44        query_emb = embedding_model.encode([query])[0]
 45        chunk_embs = embedding_model.encode(chunks)
 46        
 47        # Calculate similarities
 48        similarities = np.dot(chunk_embs, query_emb)
 49        similarities /= (np.linalg.norm(chunk_embs, axis=1) * np.linalg.norm(query_emb))
 50        
 51        # Get top chunks
 52        top_indices = np.argsort(similarities)[-max_chunks:][::-1]
 53        
 54        return '\n\n'.join([chunks[i] for i in top_indices])
 55
 56### 2. Caching Strategy
 57
 58**LangChain Implementation**:
 59```python
 60from langchain.cache import InMemoryCache, RedisCache
 61from langchain.globals import set_llm_cache
 62from langchain.llms import OpenAI
 63import redis
 64
 65# In-memory caching
 66set_llm_cache(InMemoryCache())
 67
 68# Redis caching
 69redis_client = redis.Redis(host='localhost', port=6379, db=0)
 70set_llm_cache(RedisCache(redis_client))
 71
 72# Usage - caching is automatic
 73llm = OpenAI(temperature=0)
 74
 75# First call - generates and caches
 76result1 = llm("What is Python?")
 77
 78# Second call - returns from cache (instant)
 79result2 = llm("What is Python?")
 80
 81# Custom caching wrapper
 82from langchain.cache import BaseCache
 83from typing import Optional
 84import hashlib
 85
 86class CustomCache(BaseCache):
 87    def __init__(self, redis_client):
 88        self.redis = redis_client
 89    
 90    def lookup(self, prompt: str, llm_string: str) -> Optional[str]:
 91        """Look up cached result"""
 92        key = self._generate_key(prompt, llm_string)
 93        cached = self.redis.get(key)
 94        if cached:
 95            return cached.decode()
 96        return None
 97    
 98    def update(self, prompt: str, llm_string: str, return_val: str):
 99        """Cache result"""
100        key = self._generate_key(prompt, llm_string)
101        self.redis.setex(key, 3600, return_val)  # 1 hour TTL
102    
103    def _generate_key(self, prompt: str, llm_string: str) -> str:
104        """Generate cache key"""
105        content = f"{prompt}|{llm_string}"
106        return hashlib.md5(content.encode()).hexdigest()
107
108# Use custom cache
109custom_cache = CustomCache(redis_client)
110set_llm_cache(custom_cache)

3. Batching and Parallelization

LangChain Implementation:

 1from langchain.llms import OpenAI
 2from langchain.chains import LLMChain
 3from langchain.prompts import PromptTemplate
 4from langchain.callbacks import get_openai_callback
 5import asyncio
 6from typing import List
 7
 8# Batch processing with LangChain
 9llm = OpenAI(temperature=0, batch_size=10)
10
11# Process multiple prompts in batch
12prompts = [
13    "What is Python?",
14    "What is JavaScript?",
15    "What is Go?",
16]
17
18# Batch generation (if LLM supports it)
19results = llm.generate(prompts)
20
21# Parallel processing with async
22from langchain.chat_models import ChatOpenAI
23
24async def process_prompt_async(llm, prompt: str) -> str:
25    """Process single prompt asynchronously"""
26    result = await llm.agenerate([prompt])
27    return result.generations[0][0].text
28
29async def process_batch_async(prompts: List[str]) -> List[str]:
30    """Process batch of prompts in parallel"""
31    chat_llm = ChatOpenAI(temperature=0)
32    
33    tasks = [process_prompt_async(chat_llm, prompt) for prompt in prompts]
34    results = await asyncio.gather(*tasks)
35    
36    return results
37
38# Usage
39results = asyncio.run(process_batch_async(prompts))
40
41# Batch processing with chains
42template = "Answer: {question}"
43prompt = PromptTemplate(template=template, input_variables=["question"])
44chain = LLMChain(llm=llm, prompt=prompt)
45
46# Process multiple inputs
47inputs = [{"question": q} for q in prompts]
48results = chain.apply(inputs)
49
50# With token tracking
51with get_openai_callback() as cb:
52    results = chain.apply(inputs)
53    print(f"Total tokens: {cb.total_tokens}")
54    print(f"Total cost: ${cb.total_cost:.4f}")

4. Model Selection and Routing

LangChain Implementation:

 1from langchain.llms import OpenAI
 2from langchain.chat_models import ChatOpenAI
 3from langchain.chains import LLMChain
 4from langchain.prompts import PromptTemplate
 5
 6class ModelRouter:
 7    def __init__(self):
 8        self.models = {
 9            "fast": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
10            "balanced": ChatOpenAI(model="gpt-4", temperature=0),
11            "powerful": ChatOpenAI(model="gpt-4-turbo-preview", temperature=0)
12        }
13    
14    def route(self, prompt: str, requirements: dict) -> str:
15        """Route to appropriate model based on requirements"""
16        max_latency = requirements.get("max_latency", float('inf'))
17        max_cost = requirements.get("max_cost", float('inf'))
18        min_quality = requirements.get("min_quality", 0)
19        
20        # Estimate complexity
21        complexity = self._estimate_complexity(prompt)
22        
23        # Select model
24        if complexity < 0.3 and max_latency < 2.0:
25            return "fast"
26        elif max_cost < 0.01:
27            return "fast"
28        elif complexity > 0.7 or min_quality > 0.8:
29            return "powerful"
30        else:
31            return "balanced"
32    
33    def _estimate_complexity(self, prompt: str) -> float:
34        """Estimate query complexity (0-1)"""
35        indicators = {
36            "complex": ["analyze", "compare", "evaluate", "design"],
37            "simple": ["what", "when", "who", "list"]
38        }
39        
40        prompt_lower = prompt.lower()
41        complex_count = sum(1 for word in indicators["complex"] if word in prompt_lower)
42        simple_count = sum(1 for word in indicators["simple"] if word in prompt_lower)
43        
44        if complex_count + simple_count == 0:
45            return 0.5
46        
47        return complex_count / (complex_count + simple_count)
48    
49    def generate(self, prompt: str, requirements: dict = None) -> str:
50        """Generate using routed model"""
51        model_key = self.route(prompt, requirements or {})
52        llm = self.models[model_key]
53        return llm(prompt)
54
55# Usage
56router = ModelRouter()
57result = router.generate("What is Python?", {"max_latency": 1.5})

5. Streaming for Better UX

LangChain Implementation:

 1from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
 2from langchain.llms import OpenAI
 3from langchain.chat_models import ChatOpenAI
 4
 5# Streaming with callback
 6llm = OpenAI(
 7    temperature=0,
 8    streaming=True,
 9    callbacks=[StreamingStdOutCallbackHandler()]
10)
11
12# Generate with streaming
13llm("Explain quantum computing in detail")
14
15# Async streaming
16from langchain.callbacks import AsyncCallbackHandler
17
18class AsyncStreamingHandler(AsyncCallbackHandler):
19    async def on_llm_new_token(self, token: str, **kwargs) -> None:
20        print(token, end='', flush=True)
21
22chat_llm = ChatOpenAI(
23    temperature=0,
24    streaming=True,
25    callbacks=[AsyncStreamingHandler()]
26)
27
28# Stream response
29async def stream_response(prompt: str):
30    async for chunk in chat_llm.astream(prompt):
31        print(chunk.content, end='', flush=True)
32
33import asyncio
34asyncio.run(stream_response("Explain quantum computing"))

Production Metrics to Track:

  • Latency: p50, p95, p99 response times
  • Throughput: Requests per second
  • Cost: $ per 1K tokens
  • Cache hit rate: % of cached responses
  • Token usage: Average tokens per request
  • Error rate: % of failed requests

Q3: Design a production-grade RAG system with monitoring and failure handling.

Answer:

Production RAG Architecture:

LangChain Implementation:

  1from langchain.embeddings import OpenAIEmbeddings
  2from langchain.vectorstores import FAISS, Chroma
  3from langchain.retrievers import ContextualCompressionRetriever
  4from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker
  5from langchain.chains import RetrievalQA
  6from langchain.llms import OpenAI
  7from langchain.cache import RedisCache
  8from langchain.callbacks import get_openai_callback
  9from langchain.prompts import PromptTemplate
 10from langchain.text_splitter import RecursiveCharacterTextSplitter
 11from typing import List, Optional, Dict
 12import logging
 13import time
 14import asyncio
 15from dataclasses import dataclass
 16from prometheus_client import Counter, Histogram, Gauge
 17
 18# Metrics
 19request_counter = Counter('rag_requests_total', 'Total RAG requests')
 20request_duration = Histogram('rag_request_duration_seconds', 'Request duration')
 21cache_hits = Counter('rag_cache_hits_total', 'Cache hits')
 22errors = Counter('rag_errors_total', 'Errors', ['error_type'])
 23
 24@dataclass
 25class RAGConfig:
 26    max_retries: int = 3
 27    timeout: float = 30.0
 28    cache_ttl: int = 3600
 29    max_context_tokens: int = 3000
 30    min_similarity_threshold: float = 0.7
 31    enable_reranking: bool = True
 32    fallback_enabled: bool = True
 33
 34class ProductionRAG:
 35    def __init__(self, config: RAGConfig, llm, embeddings, vectorstore, cache):
 36        self.config = config
 37        self.llm = llm
 38        self.embeddings = embeddings
 39        self.vectorstore = vectorstore
 40        self.cache = cache
 41        self.logger = logging.getLogger(__name__)
 42        
 43        # Create retriever with LangChain
 44        self.retriever = vectorstore.as_retriever(
 45            search_kwargs={"k": 5, "score_threshold": config.min_similarity_threshold}
 46        )
 47        
 48        # Add re-ranking if enabled
 49        if config.enable_reranking:
 50            from sentence_transformers import CrossEncoder
 51            reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
 52            compressor = CrossEncoderReranker(model=reranker, top_n=3)
 53            self.retriever = ContextualCompressionRetriever(
 54                base_compressor=compressor,
 55                base_retriever=self.retriever
 56            )
 57        
 58        # Create QA chain with LangChain
 59        prompt_template = """Use the following pieces of context to answer the question.
 60If you don't know the answer, just say that you don't know.
 61
 62Context: {context}
 63
 64Question: {question}
 65
 66Answer:"""
 67        
 68        PROMPT = PromptTemplate(
 69            template=prompt_template,
 70            input_variables=["context", "question"]
 71        )
 72        
 73        self.qa_chain = RetrievalQA.from_chain_type(
 74            llm=llm,
 75            chain_type="stuff",
 76            retriever=self.retriever,
 77            return_source_documents=True,
 78            chain_type_kwargs={"prompt": PROMPT}
 79        )
 80        
 81        # Enable caching
 82        from langchain.globals import set_llm_cache
 83        set_llm_cache(RedisCache(redis_client=cache))
 84    
 85    def query(self, question: str, user_id: str) -> Dict:
 86        """Main query endpoint with full production features using LangChain"""
 87        request_counter.inc()
 88        start_time = time.time()
 89        
 90        try:
 91            # 1. Validate input
 92            validation_error = self._validate_input(question)
 93            if validation_error:
 94                errors.inc({'error_type': 'validation'})
 95                return {"error": validation_error, "status": "invalid_input"}
 96            
 97            # 2. Check cache (LangChain handles this automatically via set_llm_cache)
 98            # Cache is checked automatically by LangChain
 99            
100            # 3. Query using LangChain QA chain with retry
101            try:
102                result = self.qa_chain({"query": question})
103                
104                # 4. Prepare response
105                answer = result["result"]
106                sources = [
107                    {
108                        "text": doc.page_content,
109                        "metadata": doc.metadata,
110                        "score": getattr(doc, 'score', 0.0)
111                    }
112                    for doc in result.get("source_documents", [])
113                ]
114                
115                # 5. Validate response
116                if not self._validate_response(answer):
117                    errors.inc({'error_type': 'invalid_response'})
118                    if self.config.fallback_enabled:
119                        return self._fallback_response(question)
120                
121                # 6. Prepare result
122                response = {
123                    "answer": answer,
124                    "sources": sources,
125                    "status": "success",
126                    "metadata": {
127                        "num_sources": len(sources),
128                        "cache_hit": False,  # LangChain handles caching
129                        "duration_ms": (time.time() - start_time) * 1000
130                    }
131                }
132                
133                # 7. Log metrics
134                request_duration.observe(time.time() - start_time)
135                self.logger.info(f"Successfully processed query for user {user_id}")
136                
137                return response
138                
139            except Exception as e:
140                errors.inc({'error_type': 'generation'})
141                self.logger.error(f"Error generating answer: {str(e)}")
142                
143                if self.config.fallback_enabled:
144                    return self._fallback_response(question)
145                
146                return {"error": "Failed to generate answer", "status": "error"}
147            
148        except Exception as e:
149            errors.inc({'error_type': 'unexpected'})
150            self.logger.error(f"Unexpected error: {str(e)}", exc_info=True)
151            
152            if self.config.fallback_enabled:
153                return self._fallback_response(question)
154            
155            return {"error": "Internal error", "status": "error"}
156    
157    def _validate_input(self, question: str) -> Optional[str]:
158        """Validate user input"""
159        if not question or len(question.strip()) == 0:
160            return "Question cannot be empty"
161        
162        if len(question) > 1000:
163            return "Question too long (max 1000 characters)"
164        
165        # Check for malicious content
166        if self._contains_injection_attempt(question):
167            return "Invalid characters detected"
168        
169        return None
170    
171    def _contains_injection_attempt(self, text: str) -> bool:
172        """Simple injection detection"""
173        dangerous_patterns = ["<script>", "javascript:", "eval(", "exec("]
174        return any(pattern in text.lower() for pattern in dangerous_patterns)
175    
176    async def _retrieve_with_retry(self, question: str) -> List[Dict]:
177        """Retrieve documents with retry logic"""
178        last_error = None
179        
180        for attempt in range(self.config.max_retries):
181            try:
182                # Embed query
183                embedding = await self._embed_with_fallback(question)
184                
185                # Search vector DB
186                results = await self.vector_db.search(
187                    embedding,
188                    top_k=20,
189                    min_score=self.config.min_similarity_threshold
190                )
191                
192                # Re-rank if enabled
193                if self.config.enable_reranking and results:
194                    results = await self._rerank(question, results)
195                
196                # Filter and limit
197                filtered = [r for r in results if r["score"] >= self.config.min_similarity_threshold]
198                
199                return filtered[:5]  # Top 5
200                
201            except Exception as e:
202                last_error = e
203                self.logger.warning(f"Retrieval attempt {attempt + 1} failed: {str(e)}")
204                
205                if attempt < self.config.max_retries - 1:
206                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
207        
208        self.logger.error(f"All retrieval attempts failed: {str(last_error)}")
209        return []
210    
211    async def _embed_with_fallback(self, text: str) -> np.ndarray:
212        """Embed with fallback to different model"""
213        try:
214            return self.embedding_model.encode([text])[0]
215        except Exception as e:
216            self.logger.warning(f"Primary embedding failed: {str(e)}, using fallback")
217            # Fallback to simpler model
218            return self.fallback_embedding_model.encode([text])[0]
219    
220    async def _generate_answer(self, question: str, documents: List[Dict]) -> str:
221        """Generate answer from documents"""
222        # Assemble context
223        context = self._assemble_context(documents)
224        
225        # Create prompt
226        prompt = f"""Context:
227{context}
228
229Question: {question}
230
231Provide a comprehensive answer based on the context. Include citations [1], [2], etc.
232
233Answer:"""
234        
235        # Generate
236        answer = await self.llm.generate_async(prompt)
237        
238        return answer
239    
240    def _assemble_context(self, documents: List[Dict]) -> str:
241        """Assemble context within token limits"""
242        context_parts = []
243        total_tokens = 0
244        
245        for i, doc in enumerate(documents):
246            text = doc["text"]
247            tokens = len(self.tokenizer.encode(text))
248            
249            if total_tokens + tokens > self.config.max_context_tokens:
250                break
251            
252            context_parts.append(f"[{i+1}] {text}")
253            total_tokens += tokens
254        
255        return "\n\n".join(context_parts)
256    
257    def _validate_response(self, answer: str) -> bool:
258        """Validate generated response"""
259        if not answer or len(answer.strip()) < 10:
260            return False
261        
262        # Check for common failure patterns
263        failure_patterns = [
264            "I don't know",
265            "I cannot answer",
266            "No information available"
267        ]
268        
269        return not any(pattern.lower() in answer.lower() for pattern in failure_patterns)
270    
271    async def _fallback_response(self, question: str) -> Dict:
272        """Fallback when main pipeline fails"""
273        self.logger.info("Using fallback response")
274        
275        # Simple LLM call without RAG
276        prompt = f"Answer this question concisely: {question}"
277        answer = await self.llm.generate_async(prompt)
278        
279        return {
280            "answer": answer,
281            "sources": [],
282            "status": "fallback",
283            "metadata": {"fallback": True}
284        }
285    
286    def _get_cache_key(self, question: str, user_id: str) -> str:
287        """Generate cache key"""
288        import hashlib
289        content = f"{question}|{user_id}"
290        return hashlib.md5(content.encode()).hexdigest()

Monitoring Dashboard:

 1from prometheus_client import start_http_server, generate_latest
 2
 3# Start metrics server
 4start_http_server(8000)
 5
 6# Grafana queries:
 7# - Request rate: rate(rag_requests_total[5m])
 8# - Error rate: rate(rag_errors_total[5m]) / rate(rag_requests_total[5m])
 9# - Latency p95: histogram_quantile(0.95, rag_request_duration_seconds)
10# - Cache hit rate: rate(rag_cache_hits_total[5m]) / rate(rag_requests_total[5m])

Key Production Features:

  • ✅ Input validation & sanitization
  • ✅ Caching (multi-layer)
  • ✅ Retry logic with exponential backoff
  • ✅ Timeouts
  • ✅ Fallback mechanisms
  • ✅ Comprehensive logging
  • ✅ Metrics & monitoring
  • ✅ Error handling
  • ✅ Rate limiting (not shown, but should be added)
  • ✅ A/B testing capability

Summary

Hard LLM/Agent topics require:

  • Multi-agent systems: Orchestration, communication, coordination
  • Production optimization: Caching, batching, model routing
  • Robust RAG: Monitoring, failure handling, fallbacks
  • System design: Scalability, reliability, observability

Production Checklist:

  • ✅ Comprehensive error handling
  • ✅ Retry logic & timeouts
  • ✅ Caching strategies
  • ✅ Monitoring & alerting
  • ✅ Logging & debugging
  • ✅ Fallback mechanisms
  • ✅ Input validation
  • ✅ Rate limiting
  • ✅ Cost optimization
  • ✅ A/B testing
  • ✅ Documentation

Key Skills:

  • Design distributed systems
  • Optimize for latency & cost
  • Handle failures gracefully
  • Monitor & debug production issues
  • Scale to high traffic

Related Snippets