Integrating AI into Production Applications: A Practical Guide
The rush to integrate AI into applications has been breathtaking, but many teams are discovering that moving from prototype to production is far harder than they expected. I’ve built several production AI systems at scale, and the gap between a working demo and a reliable production system is enormous.
This isn’t about using AI APIs—it’s about architecting systems that handle the unique challenges AI introduces: latency, cost, unpredictability, and the need for fallback mechanisms.
LLM Integration Patterns
The most common approach is calling LLM APIs (OpenAI, Anthropic, etc.), but the devil is in the architectural details.
Basic Integration: Function Calling
Modern LLMs support structured function calling, enabling deterministic behavior for specific tasks.
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
interface ToolDefinition {
name: string;
description: string;
input_schema: any;
}
// Define what the AI can do
const tools: ToolDefinition[] = [
{
name: 'search_knowledge_base',
description: 'Search the company knowledge base for information',
input_schema: {
type: 'object',
properties: {
query: { type: 'string', description: 'Search query' },
limit: { type: 'number', description: 'Number of results' },
},
required: ['query'],
},
},
{
name: 'create_ticket',
description: 'Create a support ticket for the user',
input_schema: {
type: 'object',
properties: {
title: { type: 'string' },
description: { type: 'string' },
priority: { enum: ['low', 'medium', 'high'] },
},
required: ['title', 'description'],
},
},
];
async function handleUserMessage(userMessage: string): Promise<string> {
const messages = [
{
role: 'user',
content: userMessage,
},
];
let response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
system: 'You are a helpful support assistant. Use available tools to help users.',
tools,
messages,
});
// Agentic loop: keep processing until we get a final response
while (response.stop_reason === 'tool_use') {
const toolUseBlocks = response.content.filter(
(block) => block.type === 'tool_use'
);
const toolResults = [];
for (const toolUse of toolUseBlocks) {
let result: any;
if (toolUse.name === 'search_knowledge_base') {
result = await searchKnowledgeBase(
toolUse.input.query,
toolUse.input.limit || 5
);
} else if (toolUse.name === 'create_ticket') {
result = await createSupportTicket(
toolUse.input.title,
toolUse.input.description,
toolUse.input.priority || 'medium'
);
}
toolResults.push({
type: 'tool_result',
tool_use_id: toolUse.id,
content: JSON.stringify(result),
});
}
// Continue conversation with tool results
messages.push({ role: 'assistant', content: response.content });
messages.push({ role: 'user', content: toolResults });
response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
system: 'You are a helpful support assistant. Use available tools to help users.',
tools,
messages,
});
}
// Extract final text response
const textBlock = response.content.find((block) => block.type === 'text');
return textBlock?.text || 'I could not generate a response.';
}
async function searchKnowledgeBase(query: string, limit: number): Promise<any> {
// Implementation: search your knowledge base
return {
results: [
{ title: 'How to reset password', content: '...' },
{ title: 'Account settings guide', content: '...' },
],
};
}
async function createSupportTicket(
title: string,
description: string,
priority: string
): Promise<any> {
// Implementation: create ticket in your system
return { ticketId: 'TICKET-12345', status: 'created' };
}// Spring Boot + Spring AI (or direct HTTP) function calling example
import org.springframework.ai.anthropic.AnthropicChatClient;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
@Service
public class SupportAgentService {
private final AnthropicChatClient chatClient;
private final ObjectMapper objectMapper = new ObjectMapper();
public SupportAgentService(AnthropicChatClient chatClient) {
this.chatClient = chatClient;
}
public record ToolDefinition(String name, String description, Map<String, Object> inputSchema) {}
public record SearchResult(String title, String content) {}
public record TicketResult(String ticketId, String status) {}
public String handleUserMessage(String userMessage) {
// In Spring AI, tools/functions are registered as @Bean FunctionCallback
// This shows the conceptual agentic loop structure
List<Map<String, Object>> messages = new ArrayList<>();
messages.add(Map.of("role", "user", "content", userMessage));
String systemPrompt = "You are a helpful support assistant. Use available tools to help users.";
// Invoke chat with function callbacks registered in context
String response = chatClient.call(
new Prompt(new UserMessage(userMessage))
).getResult().getOutput().getContent();
return response != null ? response : "I could not generate a response.";
}
public List<SearchResult> searchKnowledgeBase(String query, int limit) {
// Implementation: search your knowledge base
return List.of(
new SearchResult("How to reset password", "..."),
new SearchResult("Account settings guide", "...")
);
}
public TicketResult createSupportTicket(String title, String description, String priority) {
// Implementation: create ticket in your system
return new TicketResult("TICKET-12345", "created");
}
}import anthropic
import json
from typing import Any
client = anthropic.Anthropic()
tools = [
{
"name": "search_knowledge_base",
"description": "Search the company knowledge base for information",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "number", "description": "Number of results"},
},
"required": ["query"],
},
},
{
"name": "create_ticket",
"description": "Create a support ticket for the user",
"input_schema": {
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"},
"priority": {"type": "string", "enum": ["low", "medium", "high"]},
},
"required": ["title", "description"],
},
},
]
def search_knowledge_base(query: str, limit: int = 5) -> dict:
# Implementation: search your knowledge base
return {
"results": [
{"title": "How to reset password", "content": "..."},
{"title": "Account settings guide", "content": "..."},
]
}
def create_support_ticket(title: str, description: str, priority: str = "medium") -> dict:
# Implementation: create ticket in your system
return {"ticketId": "TICKET-12345", "status": "created"}
def handle_user_message(user_message: str) -> str:
messages = [{"role": "user", "content": user_message}]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="You are a helpful support assistant. Use available tools to help users.",
tools=tools,
messages=messages,
)
# Agentic loop: keep processing until we get a final response
while response.stop_reason == "tool_use":
tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
tool_results = []
for tool_use in tool_use_blocks:
if tool_use.name == "search_knowledge_base":
result = search_knowledge_base(
tool_use.input["query"],
tool_use.input.get("limit", 5),
)
elif tool_use.name == "create_ticket":
result = create_support_ticket(
tool_use.input["title"],
tool_use.input["description"],
tool_use.input.get("priority", "medium"),
)
else:
result = {}
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": json.dumps(result),
})
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system="You are a helpful support assistant. Use available tools to help users.",
tools=tools,
messages=messages,
)
text_block = next((b for b in response.content if b.type == "text"), None)
return text_block.text if text_block else "I could not generate a response."using Anthropic.SDK;
using Anthropic.SDK.Messaging;
using System.Text.Json;
public class SupportAgentService
{
private readonly AnthropicClient _client;
public SupportAgentService()
{
_client = new AnthropicClient(Environment.GetEnvironmentVariable("ANTHROPIC_API_KEY")!);
}
private static readonly List<Tool> Tools = new()
{
new Tool
{
Name = "search_knowledge_base",
Description = "Search the company knowledge base for information",
InputSchema = new InputSchema
{
Type = "object",
Properties = new Dictionary<string, SchemaProperty>
{
["query"] = new() { Type = "string", Description = "Search query" },
["limit"] = new() { Type = "number", Description = "Number of results" },
},
Required = new[] { "query" }
}
},
new Tool
{
Name = "create_ticket",
Description = "Create a support ticket for the user",
InputSchema = new InputSchema
{
Type = "object",
Properties = new Dictionary<string, SchemaProperty>
{
["title"] = new() { Type = "string" },
["description"] = new() { Type = "string" },
["priority"] = new() { Type = "string", Enum = new[] { "low", "medium", "high" } },
},
Required = new[] { "title", "description" }
}
}
};
public async Task<string> HandleUserMessageAsync(string userMessage)
{
var messages = new List<Message>
{
new() { Role = "user", Content = userMessage }
};
var response = await _client.Messages.CreateAsync(new MessageRequest
{
Model = "claude-3-5-sonnet-20241022",
MaxTokens = 1024,
System = "You are a helpful support assistant. Use available tools to help users.",
Tools = Tools,
Messages = messages
});
// Agentic loop: keep processing until we get a final response
while (response.StopReason == "tool_use")
{
var toolResults = new List<ToolResultContent>();
foreach (var block in response.Content.OfType<ToolUseContent>())
{
object result = block.Name switch
{
"search_knowledge_base" => SearchKnowledgeBase(
block.Input.GetProperty("query").GetString()!,
block.Input.TryGetProperty("limit", out var l) ? l.GetInt32() : 5),
"create_ticket" => CreateSupportTicket(
block.Input.GetProperty("title").GetString()!,
block.Input.GetProperty("description").GetString()!,
block.Input.TryGetProperty("priority", out var p) ? p.GetString()! : "medium"),
_ => new { }
};
toolResults.Add(new ToolResultContent
{
ToolUseId = block.Id,
Content = JsonSerializer.Serialize(result)
});
}
messages.Add(new Message { Role = "assistant", Content = response.Content });
messages.Add(new Message { Role = "user", Content = toolResults });
response = await _client.Messages.CreateAsync(new MessageRequest
{
Model = "claude-3-5-sonnet-20241022",
MaxTokens = 1024,
System = "You are a helpful support assistant. Use available tools to help users.",
Tools = Tools,
Messages = messages
});
}
var textBlock = response.Content.OfType<TextContent>().FirstOrDefault();
return textBlock?.Text ?? "I could not generate a response.";
}
private object SearchKnowledgeBase(string query, int limit) =>
new { results = new[] {
new { title = "How to reset password", content = "..." },
new { title = "Account settings guide", content = "..." }
}};
private object CreateSupportTicket(string title, string description, string priority) =>
new { ticketId = "TICKET-12345", status = "created" };
}Streaming Responses for Better UX
For long-form content, streaming provides immediate feedback to users.
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
async function* streamCompletion(prompt: string): AsyncGenerator<string> {
const stream = await client.messages.stream({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 2048,
messages: [{ role: 'user', content: prompt }],
});
for await (const chunk of stream) {
if (
chunk.type === 'content_block_delta' &&
chunk.delta.type === 'text_delta'
) {
yield chunk.delta.text;
}
}
}
// Express endpoint with streaming
app.get('/api/generate', async (req, res) => {
const { prompt } = req.query;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
try {
for await (const chunk of streamCompletion(prompt as string)) {
res.write(`data: ${JSON.stringify({ text: chunk })}\n\n`);
}
res.write(`data: [DONE]\n\n`);
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
@RequestMapping("/api")
public class GenerateController {
private final AnthropicStreamClient anthropicClient;
public GenerateController(AnthropicStreamClient anthropicClient) {
this.anthropicClient = anthropicClient;
}
@GetMapping("/generate")
public SseEmitter generate(@RequestParam String prompt) {
SseEmitter emitter = new SseEmitter(120_000L);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
// Stream from Anthropic API using reactive client
Flux<String> tokenStream = anthropicClient.streamCompletion(
"claude-3-5-sonnet-20241022", 2048, prompt
);
tokenStream.subscribe(
chunk -> {
try {
emitter.send(SseEmitter.event()
.data("{\"text\":\"" + escapeJson(chunk) + "\"}"));
} catch (IOException e) {
emitter.completeWithError(e);
}
},
error -> {
try {
emitter.send(SseEmitter.event()
.data("{\"error\":\"" + error.getMessage() + "\"}"));
emitter.complete();
} catch (IOException ignored) {}
},
() -> {
try {
emitter.send(SseEmitter.event().data("[DONE]"));
emitter.complete();
} catch (IOException ignored) {}
}
);
} catch (Exception e) {
emitter.completeWithError(e);
}
});
executor.shutdown();
return emitter;
}
private String escapeJson(String text) {
return text.replace("\\", "\\\\").replace("\"", "\\\"")
.replace("\n", "\\n").replace("\r", "\\r");
}
}import anthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
client = anthropic.Anthropic()
async def stream_completion(prompt: str):
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
yield f"data: {json.dumps({'text': text})}\n\n"
yield "data: [DONE]\n\n"
@app.get("/api/generate")
async def generate(prompt: str):
return StreamingResponse(
stream_completion(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)using Anthropic.SDK;
using Anthropic.SDK.Messaging;
using Microsoft.AspNetCore.Mvc;
using System.Text;
using System.Text.Json;
[ApiController]
[Route("api")]
public class GenerateController : ControllerBase
{
private readonly AnthropicClient _client;
public GenerateController(AnthropicClient client)
{
_client = client;
}
[HttpGet("generate")]
public async Task Generate([FromQuery] string prompt)
{
Response.Headers["Content-Type"] = "text/event-stream";
Response.Headers["Cache-Control"] = "no-cache";
Response.Headers["Connection"] = "keep-alive";
try
{
var stream = _client.Messages.StreamAsync(new MessageRequest
{
Model = "claude-3-5-sonnet-20241022",
MaxTokens = 2048,
Messages = new List<Message>
{
new() { Role = "user", Content = prompt }
}
});
await foreach (var chunk in stream)
{
if (chunk is ContentBlockDeltaEvent delta &&
delta.Delta is TextDelta textDelta)
{
var json = JsonSerializer.Serialize(new { text = textDelta.Text });
var line = Encoding.UTF8.GetBytes($"data: {json}\n\n");
await Response.Body.WriteAsync(line);
await Response.Body.FlushAsync();
}
}
var done = Encoding.UTF8.GetBytes("data: [DONE]\n\n");
await Response.Body.WriteAsync(done);
}
catch (Exception ex)
{
var error = JsonSerializer.Serialize(new { error = ex.Message });
var line = Encoding.UTF8.GetBytes($"data: {error}\n\n");
await Response.Body.WriteAsync(line);
}
}
}RAG Architecture: Grounding AI in Your Data
RAG (Retrieval Augmented Generation) prevents hallucinations by grounding AI responses in your actual data.
┌──────────────────────────────────────────┐
│ User Query: "What is our policy?" │
└──────────────────┬───────────────────────┘
│
┌───────────┴────────────┐
│ 1. Retrieve Relevant │
│ Documents from Store │
└───────────┬────────────┘
│
(Documents)
│
┌───────────┴────────────┐
│ 2. Send to LLM with │
│ Context + Query │
└───────────┬────────────┘
│
(Response)
│
┌───────────┴────────────┐
│ 3. Return Grounded │
│ Answer with Sources │
└───────────┬────────────┘
│
(Final Response)
import { Pinecone } from '@pinecone-database/pinecone';
import Anthropic from '@anthropic-ai/sdk';
const pinecone = new Pinecone({ apiKey: process.env.PINECONE_API_KEY });
const client = new Anthropic();
// Vector embedding (using OpenAI for embeddings, Anthropic for generation)
import OpenAI from 'openai';
const openai = new OpenAI();
interface Document {
id: string;
content: string;
source: string;
metadata: Record<string, any>;
}
async function embedText(text: string): Promise<number[]> {
const response = await openai.embeddings.create({
model: 'text-embedding-3-small',
input: text,
encoding_format: 'float',
});
return response.data[0].embedding;
}
class RAGSystem {
private indexName = 'company-documents';
// Index documents into vector store
async indexDocuments(documents: Document[]): Promise<void> {
const index = pinecone.Index(this.indexName);
for (const doc of documents) {
const embedding = await embedText(doc.content);
// Chunk long documents
const chunks = this.chunkDocument(doc.content, 1000);
for (let i = 0; i < chunks.length; i++) {
const chunkEmbedding = await embedText(chunks[i]);
await index.upsert([
{
id: `${doc.id}_chunk_${i}`,
values: chunkEmbedding,
metadata: {
documentId: doc.id,
source: doc.source,
chunkIndex: i,
content: chunks[i],
...doc.metadata,
},
},
]);
}
}
}
// Retrieve relevant context for a query
async retrieveContext(query: string, topK: number = 5): Promise<Document[]> {
const queryEmbedding = await embedText(query);
const index = pinecone.Index(this.indexName);
const results = await index.query({
vector: queryEmbedding,
topK,
includeMetadata: true,
});
return results.matches.map((match: any) => ({
id: match.id,
content: match.metadata.content,
source: match.metadata.source,
metadata: match.metadata,
}));
}
// Generate response grounded in retrieved context
async generateResponse(userQuery: string): Promise<{
response: string;
sources: Document[];
}> {
// Retrieve relevant documents
const context = await this.retrieveContext(userQuery);
if (context.length === 0) {
return {
response: 'I could not find relevant information in our knowledge base.',
sources: [],
};
}
// Construct prompt with context
const contextText = context
.map((doc, i) => `[${i + 1}] Source: ${doc.source}\n${doc.content}`)
.join('\n\n');
const systemPrompt = `You are a helpful assistant. Answer the user's question based on the provided context.
If the answer is not in the context, say so. Always cite your sources by referencing the source numbers [1], [2], etc.
Context:
${contextText}`;
const response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
system: systemPrompt,
messages: [{ role: 'user', content: userQuery }],
});
const textBlock = response.content.find((block) => block.type === 'text');
return {
response: textBlock?.text || 'No response generated',
sources: context,
};
}
private chunkDocument(content: string, chunkSize: number): string[] {
const chunks = [];
for (let i = 0; i < content.length; i += chunkSize) {
chunks.push(content.substring(i, i + chunkSize));
}
return chunks;
}
}import io.pinecone.clients.Index;
import io.pinecone.clients.Pinecone;
import org.springframework.ai.anthropic.AnthropicChatClient;
import org.springframework.ai.openai.OpenAiEmbeddingClient;
import org.springframework.stereotype.Service;
import java.util.*;
@Service
public class RAGSystem {
private final Pinecone pinecone;
private final AnthropicChatClient chatClient;
private final OpenAiEmbeddingClient embeddingClient;
private static final String INDEX_NAME = "company-documents";
public RAGSystem(Pinecone pinecone, AnthropicChatClient chatClient,
OpenAiEmbeddingClient embeddingClient) {
this.pinecone = pinecone;
this.chatClient = chatClient;
this.embeddingClient = embeddingClient;
}
public record Document(String id, String content, String source, Map<String, Object> metadata) {}
public record RAGResponse(String response, List<Document> sources) {}
public List<float[]> embedText(String text) {
return embeddingClient.embed(List.of(text));
}
public void indexDocuments(List<Document> documents) {
Index index = pinecone.getIndexConnection(INDEX_NAME);
for (Document doc : documents) {
List<String> chunks = chunkDocument(doc.content(), 1000);
for (int i = 0; i < chunks.size(); i++) {
List<float[]> embedding = embedText(chunks.get(i));
Map<String, Object> metadata = new HashMap<>(doc.metadata());
metadata.put("documentId", doc.id());
metadata.put("source", doc.source());
metadata.put("chunkIndex", i);
metadata.put("content", chunks.get(i));
index.upsert(doc.id() + "_chunk_" + i, embedding.get(0), metadata);
}
}
}
public List<Document> retrieveContext(String query, int topK) {
List<float[]> queryEmbedding = embedText(query);
Index index = pinecone.getIndexConnection(INDEX_NAME);
var results = index.query(topK, queryEmbedding.get(0), null, null, true, true);
return results.getMatchesList().stream().map(match -> {
var meta = match.getMetadata().getFieldsMap();
return new Document(
match.getId(),
meta.get("content").getStringValue(),
meta.get("source").getStringValue(),
Map.of()
);
}).toList();
}
public RAGResponse generateResponse(String userQuery) {
List<Document> context = retrieveContext(userQuery, 5);
if (context.isEmpty()) {
return new RAGResponse(
"I could not find relevant information in our knowledge base.", List.of()
);
}
StringBuilder contextText = new StringBuilder();
for (int i = 0; i < context.size(); i++) {
contextText.append("[").append(i + 1).append("] Source: ")
.append(context.get(i).source()).append("\n")
.append(context.get(i).content()).append("\n\n");
}
String systemPrompt = """
You are a helpful assistant. Answer the user's question based on the provided context.
If the answer is not in the context, say so. Always cite your sources [1], [2], etc.
Context:
""" + contextText;
// Spring AI prompt with system message
String response = chatClient.call(systemPrompt + "\n\nUser: " + userQuery);
return new RAGResponse(response, context);
}
private List<String> chunkDocument(String content, int chunkSize) {
List<String> chunks = new ArrayList<>();
for (int i = 0; i < content.length(); i += chunkSize) {
chunks.add(content.substring(i, Math.min(i + chunkSize, content.length())));
}
return chunks;
}
}from pinecone import Pinecone
import anthropic
from openai import OpenAI
from dataclasses import dataclass, field
from typing import Any
pinecone_client = Pinecone(api_key="PINECONE_API_KEY")
anthropic_client = anthropic.Anthropic()
openai_client = OpenAI()
INDEX_NAME = "company-documents"
@dataclass
class Document:
id: str
content: str
source: str
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class RAGResponse:
response: str
sources: list[Document]
def embed_text(text: str) -> list[float]:
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=text,
encoding_format="float",
)
return response.data[0].embedding
class RAGSystem:
def __init__(self):
self.index = pinecone_client.Index(INDEX_NAME)
def chunk_document(self, content: str, chunk_size: int) -> list[str]:
return [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
def index_documents(self, documents: list[Document]) -> None:
for doc in documents:
chunks = self.chunk_document(doc.content, 1000)
vectors = []
for i, chunk in enumerate(chunks):
embedding = embed_text(chunk)
vectors.append({
"id": f"{doc.id}_chunk_{i}",
"values": embedding,
"metadata": {
"documentId": doc.id,
"source": doc.source,
"chunkIndex": i,
"content": chunk,
**doc.metadata,
},
})
self.index.upsert(vectors=vectors)
def retrieve_context(self, query: str, top_k: int = 5) -> list[Document]:
query_embedding = embed_text(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True,
)
return [
Document(
id=match["id"],
content=match["metadata"]["content"],
source=match["metadata"]["source"],
metadata=match["metadata"],
)
for match in results["matches"]
]
def generate_response(self, user_query: str) -> RAGResponse:
context = self.retrieve_context(user_query)
if not context:
return RAGResponse(
response="I could not find relevant information in our knowledge base.",
sources=[],
)
context_text = "\n\n".join(
f"[{i + 1}] Source: {doc.source}\n{doc.content}"
for i, doc in enumerate(context)
)
system_prompt = f"""You are a helpful assistant. Answer the user's question based on the provided context.
If the answer is not in the context, say so. Always cite your sources by referencing [1], [2], etc.
Context:
{context_text}"""
response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
system=system_prompt,
messages=[{"role": "user", "content": user_query}],
)
text_block = next((b for b in response.content if b.type == "text"), None)
return RAGResponse(
response=text_block.text if text_block else "No response generated",
sources=context,
)using Anthropic.SDK;
using Anthropic.SDK.Messaging;
using OpenAI;
using Pinecone;
using System.Text;
public class RAGSystem
{
private readonly PineconeClient _pinecone;
private readonly AnthropicClient _anthropic;
private readonly OpenAIClient _openai;
private const string IndexName = "company-documents";
public RAGSystem(PineconeClient pinecone, AnthropicClient anthropic, OpenAIClient openai)
{
_pinecone = pinecone;
_anthropic = anthropic;
_openai = openai;
}
public record Document(string Id, string Content, string Source, Dictionary<string, object> Metadata);
public record RAGResponse(string Response, List<Document> Sources);
public async Task<float[]> EmbedTextAsync(string text)
{
var response = await _openai.GetEmbeddingsClient().GenerateEmbeddingAsync(text);
return response.Value.Vector.ToArray();
}
public async Task IndexDocumentsAsync(List<Document> documents)
{
var index = _pinecone.GetIndex(IndexName);
foreach (var doc in documents)
{
var chunks = ChunkDocument(doc.Content, 1000);
for (int i = 0; i < chunks.Count; i++)
{
var embedding = await EmbedTextAsync(chunks[i]);
var metadata = new Dictionary<string, MetadataValue>(doc.Metadata.Select(kv =>
new KeyValuePair<string, MetadataValue>(kv.Key, new MetadataValue(kv.Value?.ToString()))))
{
["documentId"] = new MetadataValue(doc.Id),
["source"] = new MetadataValue(doc.Source),
["chunkIndex"] = new MetadataValue(i.ToString()),
["content"] = new MetadataValue(chunks[i]),
};
await index.UpsertAsync(new[] {
new Vector { Id = $"{doc.Id}_chunk_{i}", Values = embedding, Metadata = metadata }
});
}
}
}
public async Task<List<Document>> RetrieveContextAsync(string query, uint topK = 5)
{
var queryEmbedding = await EmbedTextAsync(query);
var index = _pinecone.GetIndex(IndexName);
var results = await index.QueryAsync(new QueryRequest
{
Vector = queryEmbedding,
TopK = topK,
IncludeMetadata = true
});
return results.Matches.Select(match => new Document(
match.Id,
match.Metadata["content"].Inner.ToString()!,
match.Metadata["source"].Inner.ToString()!,
new Dictionary<string, object>()
)).ToList();
}
public async Task<RAGResponse> GenerateResponseAsync(string userQuery)
{
var context = await RetrieveContextAsync(userQuery);
if (context.Count == 0)
{
return new RAGResponse(
"I could not find relevant information in our knowledge base.", new List<Document>()
);
}
var contextText = new StringBuilder();
for (int i = 0; i < context.Count; i++)
{
contextText.AppendLine($"[{i + 1}] Source: {context[i].Source}");
contextText.AppendLine(context[i].Content);
contextText.AppendLine();
}
var systemPrompt = $"""
You are a helpful assistant. Answer the user's question based on the provided context.
If the answer is not in the context, say so. Always cite your sources [1], [2], etc.
Context:
{contextText}
""";
var response = await _anthropic.Messages.CreateAsync(new MessageRequest
{
Model = "claude-3-5-sonnet-20241022",
MaxTokens = 1024,
System = systemPrompt,
Messages = new List<Message> { new() { Role = "user", Content = userQuery } }
});
var textBlock = response.Content.OfType<TextContent>().FirstOrDefault();
return new RAGResponse(textBlock?.Text ?? "No response generated", context);
}
private List<string> ChunkDocument(string content, int chunkSize)
{
var chunks = new List<string>();
for (int i = 0; i < content.Length; i += chunkSize)
chunks.Add(content.Substring(i, Math.Min(chunkSize, content.Length - i)));
return chunks;
}
}Cost Optimization Strategies
AI API costs scale with usage. A single careless implementation can cost thousands monthly.
// Cost tracking and limiting
interface CostTracker {
inputTokens: number;
outputTokens: number;
estimatedCost: number;
}
const PRICING = {
'claude-3-5-sonnet-20241022': {
input: 0.003 / 1000,
output: 0.015 / 1000,
},
};
class CostAwareLLM {
private dailyBudget = 100; // dollars
private dailySpent = 0;
private costHistory: Map<string, number> = new Map();
async callLLM(
prompt: string,
maxTokens: number = 1024
): Promise<{ response: string; cost: number }> {
// Check budget
if (this.dailySpent > this.dailyBudget) {
throw new Error('Daily budget exceeded');
}
// Use cheaper models for simple tasks
const model = this.selectModel(prompt);
// Add fallback for cost control
const response = await this.callWithFallback(prompt, model, maxTokens);
// Calculate cost
const estimatedCost =
(response.usage.input_tokens *
PRICING[model].input +
response.usage.output_tokens * PRICING[model].output) *
1.1; // 10% buffer
this.dailySpent += estimatedCost;
this.costHistory.set(new Date().toISOString(), estimatedCost);
return {
response: response.content[0].text,
cost: estimatedCost,
};
}
private selectModel(prompt: string): string {
// Use cheaper Haiku model for simple queries
if (prompt.length < 100) {
return 'claude-3-5-haiku-20241022';
}
// Use Sonnet for complex tasks
return 'claude-3-5-sonnet-20241022';
}
private async callWithFallback(
prompt: string,
model: string,
maxTokens: number
): Promise<any> {
const client = new Anthropic();
try {
return await client.messages.create({
model,
max_tokens: maxTokens,
messages: [{ role: 'user', content: prompt }],
});
} catch (error: any) {
if (error.status === 429) {
// Rate limited, use cached response if available
return this.getCachedResponse(prompt);
}
throw error;
}
}
private getCachedResponse(prompt: string): any {
// Implement caching for common queries
return {
content: [{ text: 'Cached response...' }],
usage: { input_tokens: 0, output_tokens: 50 },
};
}
}import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@Service
public class CostAwareLLM {
private static final Map<String, double[]> PRICING = Map.of(
"claude-3-5-sonnet-20241022", new double[]{0.003 / 1000.0, 0.015 / 1000.0},
"claude-3-5-haiku-20241022", new double[]{0.001 / 1000.0, 0.005 / 1000.0}
);
private final double dailyBudget = 100.0;
private final AtomicReference<Double> dailySpent = new AtomicReference<>(0.0);
private final Map<String, Double> costHistory = new HashMap<>();
public record LLMResult(String response, double cost) {}
public LLMResult callLLM(String prompt, int maxTokens) throws Exception {
if (dailySpent.get() > dailyBudget) {
throw new RuntimeException("Daily budget exceeded");
}
String model = selectModel(prompt);
var apiResponse = callWithFallback(prompt, model, maxTokens);
double[] pricing = PRICING.get(model);
double estimatedCost =
(apiResponse.inputTokens() * pricing[0] +
apiResponse.outputTokens() * pricing[1]) * 1.1;
dailySpent.updateAndGet(v -> v + estimatedCost);
costHistory.put(Instant.now().toString(), estimatedCost);
return new LLMResult(apiResponse.content(), estimatedCost);
}
private String selectModel(String prompt) {
return prompt.length() < 100
? "claude-3-5-haiku-20241022"
: "claude-3-5-sonnet-20241022";
}
private ApiResponse callWithFallback(String prompt, String model, int maxTokens) {
try {
return invokeAnthropicAPI(prompt, model, maxTokens);
} catch (RateLimitException e) {
return getCachedResponse(prompt);
}
}
private ApiResponse getCachedResponse(String prompt) {
return new ApiResponse("Cached response...", 0, 50);
}
private ApiResponse invokeAnthropicAPI(String prompt, String model, int maxTokens) {
// Call Anthropic API via HTTP client or SDK
throw new UnsupportedOperationException("Implement with actual SDK");
}
record ApiResponse(String content, int inputTokens, int outputTokens) {}
static class RateLimitException extends RuntimeException {}
}import anthropic
from datetime import datetime
from threading import Lock
PRICING = {
"claude-3-5-sonnet-20241022": {"input": 0.003 / 1000, "output": 0.015 / 1000},
"claude-3-5-haiku-20241022": {"input": 0.001 / 1000, "output": 0.005 / 1000},
}
class CostAwareLLM:
def __init__(self, daily_budget: float = 100.0):
self.daily_budget = daily_budget
self.daily_spent = 0.0
self.cost_history: dict[str, float] = {}
self._lock = Lock()
def call_llm(self, prompt: str, max_tokens: int = 1024) -> dict:
with self._lock:
if self.daily_spent > self.daily_budget:
raise RuntimeError("Daily budget exceeded")
model = self._select_model(prompt)
response = self._call_with_fallback(prompt, model, max_tokens)
pricing = PRICING[model]
estimated_cost = (
response.usage.input_tokens * pricing["input"] +
response.usage.output_tokens * pricing["output"]
) * 1.1 # 10% buffer
with self._lock:
self.daily_spent += estimated_cost
self.cost_history[datetime.utcnow().isoformat()] = estimated_cost
return {
"response": response.content[0].text,
"cost": estimated_cost,
}
def _select_model(self, prompt: str) -> str:
if len(prompt) < 100:
return "claude-3-5-haiku-20241022"
return "claude-3-5-sonnet-20241022"
def _call_with_fallback(self, prompt: str, model: str, max_tokens: int):
client = anthropic.Anthropic()
try:
return client.messages.create(
model=model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
except anthropic.RateLimitError:
return self._get_cached_response()
def _get_cached_response(self):
# Stub for cached response
class FakeUsage:
input_tokens = 0
output_tokens = 50
class FakeContent:
text = "Cached response..."
class FakeResponse:
content = [FakeContent()]
usage = FakeUsage()
return FakeResponse()using Anthropic.SDK;
using Anthropic.SDK.Messaging;
public class CostAwareLLM
{
private static readonly Dictionary<string, (double Input, double Output)> Pricing = new()
{
["claude-3-5-sonnet-20241022"] = (0.003 / 1000.0, 0.015 / 1000.0),
["claude-3-5-haiku-20241022"] = (0.001 / 1000.0, 0.005 / 1000.0),
};
private readonly double _dailyBudget = 100.0;
private double _dailySpent = 0.0;
private readonly Dictionary<string, double> _costHistory = new();
private readonly object _lock = new();
public record LLMResult(string Response, double Cost);
public async Task<LLMResult> CallLLMAsync(string prompt, int maxTokens = 1024)
{
lock (_lock)
{
if (_dailySpent > _dailyBudget)
throw new InvalidOperationException("Daily budget exceeded");
}
var model = SelectModel(prompt);
var response = await CallWithFallbackAsync(prompt, model, maxTokens);
var pricing = Pricing[model];
var estimatedCost =
(response.Usage.InputTokens * pricing.Input +
response.Usage.OutputTokens * pricing.Output) * 1.1;
lock (_lock)
{
_dailySpent += estimatedCost;
_costHistory[DateTime.UtcNow.ToString("O")] = estimatedCost;
}
var textContent = response.Content.OfType<TextContent>().First();
return new LLMResult(textContent.Text, estimatedCost);
}
private string SelectModel(string prompt) =>
prompt.Length < 100 ? "claude-3-5-haiku-20241022" : "claude-3-5-sonnet-20241022";
private async Task<MessageResponse> CallWithFallbackAsync(string prompt, string model, int maxTokens)
{
var client = new AnthropicClient(Environment.GetEnvironmentVariable("ANTHROPIC_API_KEY")!);
try
{
return await client.Messages.CreateAsync(new MessageRequest
{
Model = model,
MaxTokens = maxTokens,
Messages = new List<Message> { new() { Role = "user", Content = prompt } }
});
}
catch (AnthropicException ex) when (ex.StatusCode == 429)
{
return GetCachedResponse();
}
}
private MessageResponse GetCachedResponse()
{
// Return a stub cached response
return new MessageResponse
{
Content = new List<ContentBase> { new TextContent { Text = "Cached response..." } },
Usage = new Usage { InputTokens = 0, OutputTokens = 50 }
};
}
}Response Caching
Cache AI responses to avoid redundant API calls.
import redis from 'redis';
import crypto from 'crypto';
const redisClient = redis.createClient();
interface CachedResponse {
response: string;
timestamp: number;
tokens: { input: number; output: number };
}
async function getCachedOrGenerate(
query: string,
ttl: number = 3600
): Promise<string> {
const cacheKey = `ai_response:${crypto
.createHash('md5')
.update(query)
.digest('hex')}`;
// Try cache first
const cached = await redisClient.get(cacheKey);
if (cached) {
const data: CachedResponse = JSON.parse(cached);
console.log(
`Cache hit! Saved ${data.tokens.input + data.tokens.output} tokens`
);
return data.response;
}
// Generate new response
const client = new Anthropic();
const response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: query }],
});
const textBlock = response.content.find((block) => block.type === 'text');
const responseText = textBlock?.text || '';
// Cache the response
const cacheData: CachedResponse = {
response: responseText,
timestamp: Date.now(),
tokens: {
input: response.usage.input_tokens,
output: response.usage.output_tokens,
},
};
await redisClient.setex(cacheKey, ttl, JSON.stringify(cacheData));
return responseText;
}import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.security.MessageDigest;
import java.util.HexFormat;
import java.time.Duration;
@Service
public class AIResponseCache {
private final StringRedisTemplate redis;
private final ObjectMapper objectMapper;
public AIResponseCache(StringRedisTemplate redis, ObjectMapper objectMapper) {
this.redis = redis;
this.objectMapper = objectMapper;
}
public record CachedResponse(String response, long timestamp, int inputTokens, int outputTokens) {}
public String getCachedOrGenerate(String query, int ttlSeconds) throws Exception {
String cacheKey = "ai_response:" + md5(query);
String cached = redis.opsForValue().get(cacheKey);
if (cached != null) {
CachedResponse data = objectMapper.readValue(cached, CachedResponse.class);
System.out.printf("Cache hit! Saved %d tokens%n",
data.inputTokens() + data.outputTokens());
return data.response();
}
// Generate new response via Anthropic API
var apiResult = callAnthropicAPI(query);
CachedResponse cacheData = new CachedResponse(
apiResult.text(), System.currentTimeMillis(),
apiResult.inputTokens(), apiResult.outputTokens()
);
redis.opsForValue().set(
cacheKey,
objectMapper.writeValueAsString(cacheData),
Duration.ofSeconds(ttlSeconds)
);
return apiResult.text();
}
private String md5(String input) throws Exception {
MessageDigest md = MessageDigest.getInstance("MD5");
return HexFormat.of().formatHex(md.digest(input.getBytes()));
}
private record ApiResult(String text, int inputTokens, int outputTokens) {}
private ApiResult callAnthropicAPI(String query) {
// Implement with actual Anthropic SDK
throw new UnsupportedOperationException("Implement with Anthropic SDK");
}
}import redis
import hashlib
import json
import anthropic
from dataclasses import dataclass, asdict
from time import time
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
anthropic_client = anthropic.Anthropic()
@dataclass
class CachedResponse:
response: str
timestamp: float
tokens: dict
def get_cached_or_generate(query: str, ttl: int = 3600) -> str:
cache_key = f"ai_response:{hashlib.md5(query.encode()).hexdigest()}"
cached = redis_client.get(cache_key)
if cached:
data = CachedResponse(**json.loads(cached))
saved = data.tokens["input"] + data.tokens["output"]
print(f"Cache hit! Saved {saved} tokens")
return data.response
response = anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": query}],
)
text_block = next((b for b in response.content if b.type == "text"), None)
response_text = text_block.text if text_block else ""
cache_data = CachedResponse(
response=response_text,
timestamp=time(),
tokens={
"input": response.usage.input_tokens,
"output": response.usage.output_tokens,
},
)
redis_client.setex(cache_key, ttl, json.dumps(asdict(cache_data)))
return response_textusing Anthropic.SDK;
using Anthropic.SDK.Messaging;
using StackExchange.Redis;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
public class AIResponseCache
{
private readonly IDatabase _redis;
private readonly AnthropicClient _client;
public AIResponseCache(IConnectionMultiplexer redis, AnthropicClient client)
{
_redis = redis.GetDatabase();
_client = client;
}
public record CachedResponse(string Response, long Timestamp, int InputTokens, int OutputTokens);
public async Task<string> GetCachedOrGenerateAsync(string query, int ttlSeconds = 3600)
{
var cacheKey = $"ai_response:{ComputeMd5(query)}";
var cached = await _redis.StringGetAsync(cacheKey);
if (cached.HasValue)
{
var data = JsonSerializer.Deserialize<CachedResponse>(cached!)!;
Console.WriteLine($"Cache hit! Saved {data.InputTokens + data.OutputTokens} tokens");
return data.Response;
}
var response = await _client.Messages.CreateAsync(new MessageRequest
{
Model = "claude-3-5-sonnet-20241022",
MaxTokens = 1024,
Messages = new List<Message> { new() { Role = "user", Content = query } }
});
var textBlock = response.Content.OfType<TextContent>().FirstOrDefault();
var responseText = textBlock?.Text ?? "";
var cacheData = new CachedResponse(
responseText,
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
response.Usage.InputTokens,
response.Usage.OutputTokens
);
await _redis.StringSetAsync(
cacheKey,
JsonSerializer.Serialize(cacheData),
TimeSpan.FromSeconds(ttlSeconds)
);
return responseText;
}
private static string ComputeMd5(string input)
{
var hash = MD5.HashData(Encoding.UTF8.GetBytes(input));
return Convert.ToHexString(hash).ToLowerInvariant();
}
}Fallback and Degradation Strategies
Production systems need graceful degradation when AI fails.
interface TextGenerationService {
generate(prompt: string): Promise<string>;
}
class ResilientAIService implements TextGenerationService {
constructor(
private aiService: TextGenerationService,
private fallbackService: TextGenerationService,
private templateService: TemplateService
) {}
async generate(prompt: string): Promise<string> {
try {
// Primary: AI service
return await this.aiService.generate(prompt);
} catch (primaryError) {
console.warn('AI service failed, using fallback:', primaryError);
try {
// Secondary: Fallback AI service
return await this.fallbackService.generate(prompt);
} catch (fallbackError) {
console.warn('Fallback service failed, using template:', fallbackError);
// Tertiary: Template-based response
return this.templateService.getTemplate(prompt);
}
}
}
}
class TemplateService {
private templates: Map<string, string> = new Map([
[
'greeting',
'Hello! I appreciate your message. How can I help you today?',
],
[
'support_contact',
'Please reach out to our support team at support@example.com.',
],
['error', 'I apologize, but I was unable to process your request.'],
]);
getTemplate(prompt: string): string {
if (prompt.toLowerCase().includes('hello')) {
return this.templates.get('greeting') || this.templates.get('error')!;
}
if (prompt.toLowerCase().includes('support')) {
return this.templates.get('support_contact')!;
}
return this.templates.get('error')!;
}
}import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Map;
public interface TextGenerationService {
String generate(String prompt) throws Exception;
}
@Service
public class ResilientAIService implements TextGenerationService {
private static final Logger log = LoggerFactory.getLogger(ResilientAIService.class);
private final TextGenerationService aiService;
private final TextGenerationService fallbackService;
private final TemplateService templateService;
public ResilientAIService(TextGenerationService aiService,
TextGenerationService fallbackService,
TemplateService templateService) {
this.aiService = aiService;
this.fallbackService = fallbackService;
this.templateService = templateService;
}
@Override
public String generate(String prompt) throws Exception {
try {
return aiService.generate(prompt);
} catch (Exception primaryError) {
log.warn("AI service failed, using fallback: {}", primaryError.getMessage());
try {
return fallbackService.generate(prompt);
} catch (Exception fallbackError) {
log.warn("Fallback service failed, using template: {}", fallbackError.getMessage());
return templateService.getTemplate(prompt);
}
}
}
}
@Service
class TemplateService {
private static final Map<String, String> TEMPLATES = Map.of(
"greeting", "Hello! I appreciate your message. How can I help you today?",
"support_contact", "Please reach out to our support team at support@example.com.",
"error", "I apologize, but I was unable to process your request."
);
public String getTemplate(String prompt) {
String lower = prompt.toLowerCase();
if (lower.contains("hello")) return TEMPLATES.get("greeting");
if (lower.contains("support")) return TEMPLATES.get("support_contact");
return TEMPLATES.get("error");
}
}import logging
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
class TextGenerationService(ABC):
@abstractmethod
async def generate(self, prompt: str) -> str: ...
class TemplateService:
_templates = {
"greeting": "Hello! I appreciate your message. How can I help you today?",
"support_contact": "Please reach out to our support team at support@example.com.",
"error": "I apologize, but I was unable to process your request.",
}
def get_template(self, prompt: str) -> str:
lower = prompt.lower()
if "hello" in lower:
return self._templates["greeting"]
if "support" in lower:
return self._templates["support_contact"]
return self._templates["error"]
class ResilientAIService(TextGenerationService):
def __init__(
self,
ai_service: TextGenerationService,
fallback_service: TextGenerationService,
template_service: TemplateService,
):
self.ai_service = ai_service
self.fallback_service = fallback_service
self.template_service = template_service
async def generate(self, prompt: str) -> str:
try:
return await self.ai_service.generate(prompt)
except Exception as primary_error:
logger.warning("AI service failed, using fallback: %s", primary_error)
try:
return await self.fallback_service.generate(prompt)
except Exception as fallback_error:
logger.warning("Fallback service failed, using template: %s", fallback_error)
return self.template_service.get_template(prompt)using Microsoft.Extensions.Logging;
public interface ITextGenerationService
{
Task<string> GenerateAsync(string prompt);
}
public class TemplateService
{
private static readonly Dictionary<string, string> Templates = new()
{
["greeting"] = "Hello! I appreciate your message. How can I help you today?",
["support_contact"] = "Please reach out to our support team at support@example.com.",
["error"] = "I apologize, but I was unable to process your request.",
};
public string GetTemplate(string prompt)
{
var lower = prompt.ToLowerInvariant();
if (lower.Contains("hello")) return Templates["greeting"];
if (lower.Contains("support")) return Templates["support_contact"];
return Templates["error"];
}
}
public class ResilientAIService : ITextGenerationService
{
private readonly ITextGenerationService _aiService;
private readonly ITextGenerationService _fallbackService;
private readonly TemplateService _templateService;
private readonly ILogger<ResilientAIService> _logger;
public ResilientAIService(
ITextGenerationService aiService,
ITextGenerationService fallbackService,
TemplateService templateService,
ILogger<ResilientAIService> logger)
{
_aiService = aiService;
_fallbackService = fallbackService;
_templateService = templateService;
_logger = logger;
}
public async Task<string> GenerateAsync(string prompt)
{
try
{
return await _aiService.GenerateAsync(prompt);
}
catch (Exception primaryError)
{
_logger.LogWarning("AI service failed, using fallback: {Error}", primaryError.Message);
try
{
return await _fallbackService.GenerateAsync(prompt);
}
catch (Exception fallbackError)
{
_logger.LogWarning("Fallback service failed, using template: {Error}", fallbackError.Message);
return _templateService.GetTemplate(prompt);
}
}
}
}Monitoring and Quality Assurance
What gets measured gets managed. Monitor AI quality rigorously.
interface AIQualityMetrics {
latency: number;
cost: number;
tokensUsed: { input: number; output: number };
userRating?: number;
userFeedback?: string;
timestamp: Date;
}
class AIMonitor {
private metrics: AIQualityMetrics[] = [];
async trackRequest<T>(
fn: () => Promise<T>,
userId: string
): Promise<{ result: T; metrics: AIQualityMetrics }> {
const startTime = Date.now();
try {
const result = await fn();
const latency = Date.now() - startTime;
const metrics: AIQualityMetrics = {
latency,
cost: 0, // Would be calculated from actual API calls
tokensUsed: { input: 0, output: 0 },
timestamp: new Date(),
};
this.metrics.push(metrics);
this.logMetrics(metrics);
return { result, metrics };
} catch (error) {
const latency = Date.now() - startTime;
console.error('AI request failed', {
userId,
latency,
error: error.message,
});
throw error;
}
}
private logMetrics(metrics: AIQualityMetrics): void {
// Send to monitoring system (DataDog, New Relic, etc.)
console.log('AI Metrics:', {
latency: `${metrics.latency}ms`,
cost: `$${metrics.cost.toFixed(4)}`,
tokens: metrics.tokensUsed,
});
}
async captureUserFeedback(
requestId: string,
rating: number,
feedback?: string
): Promise<void> {
// Allow users to rate AI responses
const metric = this.metrics.find((m) => m.timestamp.getTime() === requestId);
if (metric) {
metric.userRating = rating;
metric.userFeedback = feedback;
// Train on feedback to improve
if (rating < 3) {
await this.logFailedResponse(metric);
}
}
}
private async logFailedResponse(metric: AIQualityMetrics): Promise<void> {
// Store poor quality responses for later analysis and model fine-tuning
console.log('Low quality response detected:', metric);
}
}import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@Component
public class AIMonitor {
private static final Logger log = LoggerFactory.getLogger(AIMonitor.class);
private final MeterRegistry meterRegistry;
private final List<AIQualityMetrics> metrics = new ArrayList<>();
public AIMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public record AIQualityMetrics(
long latencyMs, double cost, int inputTokens, int outputTokens,
Integer userRating, String userFeedback, Instant timestamp
) {}
public <T> TrackResult<T> trackRequest(Callable<T> fn, String userId) throws Exception {
long startTime = System.currentTimeMillis();
try {
T result = fn.call();
long latency = System.currentTimeMillis() - startTime;
var m = new AIQualityMetrics(latency, 0.0, 0, 0, null, null, Instant.now());
metrics.add(m);
logMetrics(m);
// Record to Micrometer/DataDog
meterRegistry.timer("ai.request.latency").record(latency,
java.util.concurrent.TimeUnit.MILLISECONDS);
return new TrackResult<>(result, m);
} catch (Exception e) {
long latency = System.currentTimeMillis() - startTime;
log.error("AI request failed: userId={}, latency={}ms, error={}", userId, latency, e.getMessage());
throw e;
}
}
public void captureUserFeedback(Instant timestamp, int rating, String feedback) {
metrics.stream()
.filter(m -> m.timestamp().equals(timestamp))
.findFirst()
.ifPresent(m -> {
if (rating < 3) {
log.warn("Low quality response detected: latency={}ms, cost={}", m.latencyMs(), m.cost());
}
});
}
private void logMetrics(AIQualityMetrics m) {
log.info("AI Metrics: latency={}ms, cost=${}, tokens=in:{}/out:{}",
m.latencyMs(), String.format("%.4f", m.cost()), m.inputTokens(), m.outputTokens());
}
public record TrackResult<T>(T result, AIQualityMetrics metrics) {}
}import logging
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
@dataclass
class AIQualityMetrics:
latency: float
cost: float
tokens_used: dict
timestamp: datetime
user_rating: int | None = None
user_feedback: str | None = None
@dataclass
class AIMonitor:
_metrics: list[AIQualityMetrics] = field(default_factory=list)
async def track_request(self, fn: Callable, user_id: str) -> tuple[Any, AIQualityMetrics]:
start_time = time.time()
try:
result = await fn()
latency = (time.time() - start_time) * 1000
metrics = AIQualityMetrics(
latency=latency,
cost=0.0,
tokens_used={"input": 0, "output": 0},
timestamp=datetime.utcnow(),
)
self._metrics.append(metrics)
self._log_metrics(metrics)
return result, metrics
except Exception as error:
latency = (time.time() - start_time) * 1000
logger.error("AI request failed", extra={
"user_id": user_id, "latency_ms": latency, "error": str(error)
})
raise
def _log_metrics(self, metrics: AIQualityMetrics) -> None:
logger.info(
"AI Metrics: latency=%.0fms cost=$%.4f tokens=%s",
metrics.latency,
metrics.cost,
metrics.tokens_used,
)
async def capture_user_feedback(
self, request_id: str, rating: int, feedback: str | None = None
) -> None:
metric = next(
(m for m in self._metrics if str(m.timestamp.timestamp()) == request_id), None
)
if metric:
metric.user_rating = rating
metric.user_feedback = feedback
if rating < 3:
await self._log_failed_response(metric)
async def _log_failed_response(self, metric: AIQualityMetrics) -> None:
logger.warning("Low quality response detected: %s", metric)using Microsoft.Extensions.Logging;
public record AIQualityMetrics(
double LatencyMs,
double Cost,
int InputTokens,
int OutputTokens,
DateTime Timestamp,
int? UserRating = null,
string? UserFeedback = null
);
public class AIMonitor
{
private readonly List<AIQualityMetrics> _metrics = new();
private readonly ILogger<AIMonitor> _logger;
public AIMonitor(ILogger<AIMonitor> logger) => _logger = logger;
public async Task<(T Result, AIQualityMetrics Metrics)> TrackRequestAsync<T>(
Func<Task<T>> fn, string userId)
{
var startTime = DateTime.UtcNow;
var sw = System.Diagnostics.Stopwatch.StartNew();
try
{
var result = await fn();
sw.Stop();
var metrics = new AIQualityMetrics(
sw.Elapsed.TotalMilliseconds, 0.0, 0, 0, startTime
);
_metrics.Add(metrics);
LogMetrics(metrics);
return (result, metrics);
}
catch (Exception ex)
{
sw.Stop();
_logger.LogError("AI request failed: userId={UserId}, latency={Latency}ms, error={Error}",
userId, sw.Elapsed.TotalMilliseconds, ex.Message);
throw;
}
}
private void LogMetrics(AIQualityMetrics metrics)
{
_logger.LogInformation(
"AI Metrics: latency={Latency}ms cost=${Cost:F4} tokens=in:{Input}/out:{Output}",
metrics.LatencyMs, metrics.Cost, metrics.InputTokens, metrics.OutputTokens);
}
public async Task CaptureUserFeedbackAsync(DateTime timestamp, int rating, string? feedback = null)
{
var metric = _metrics.FirstOrDefault(m => m.Timestamp == timestamp);
if (metric is not null && rating < 3)
{
_logger.LogWarning("Low quality response detected: latency={Latency}ms", metric.LatencyMs);
await Task.CompletedTask;
}
}
}Prompt Engineering for Production
Prompts determine quality. Invest heavily in prompt engineering.
// Version controlled prompts
const SYSTEM_PROMPTS = {
v1: `You are a customer support specialist...`,
v2: `You are an expert customer support agent with deep knowledge...
Guidelines:
- Always be empathetic
- Provide specific solutions, not generic advice
- Escalate to human if you cannot help`,
};
// Prompt templates with variables
function generatePrompt(
context: 'support' | 'sales' | 'technical',
query: string,
userHistory?: string
): string {
const base = SYSTEM_PROMPTS.v2;
const contextualGuidance = {
support:
'Focus on resolving the user\'s problem quickly. Prioritize their satisfaction.',
sales:
'Listen to their needs. Recommend products that solve their specific problems.',
technical:
'Provide accurate technical information. Link to documentation when relevant.',
};
let prompt = base + '\n\n' + contextualGuidance[context];
if (userHistory) {
prompt += `\n\nUser conversation history:\n${userHistory}`;
}
prompt += `\n\nUser query: ${query}`;
return prompt;
}
// A/B testing prompts
class PromptExperiment {
async runExperiment(
queries: string[],
promptA: string,
promptB: string
): Promise<{ winner: 'A' | 'B'; improvement: number }> {
const resultsA = [];
const resultsB = [];
for (const query of queries) {
const resA = await callLLM(promptA + '\n\nQuery: ' + query);
const resB = await callLLM(promptB + '\n\nQuery: ' + query);
resultsA.push(this.scoreResponse(resA));
resultsB.push(this.scoreResponse(resB));
}
const avgA = resultsA.reduce((a, b) => a + b) / resultsA.length;
const avgB = resultsB.reduce((a, b) => a + b) / resultsB.length;
return {
winner: avgA > avgB ? 'A' : 'B',
improvement: Math.abs((avgB - avgA) / avgA) * 100,
};
}
private scoreResponse(response: string): number {
// Implement scoring logic: length, relevance, sentiment, etc.
return response.length / 100; // Simple heuristic
}
}
async function callLLM(prompt: string): Promise<string> {
const client = new Anthropic();
const response = await client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }],
});
const textBlock = response.content.find((block) => block.type === 'text');
return textBlock?.text || '';
}import java.util.*;
public class PromptEngineering {
private static final Map<String, String> SYSTEM_PROMPTS = Map.of(
"v1", "You are a customer support specialist...",
"v2", """
You are an expert customer support agent with deep knowledge...
Guidelines:
- Always be empathetic
- Provide specific solutions, not generic advice
- Escalate to human if you cannot help"""
);
private static final Map<String, String> CONTEXTUAL_GUIDANCE = Map.of(
"support", "Focus on resolving the user's problem quickly. Prioritize their satisfaction.",
"sales", "Listen to their needs. Recommend products that solve their specific problems.",
"technical", "Provide accurate technical information. Link to documentation when relevant."
);
public String generatePrompt(String context, String query, String userHistory) {
String base = SYSTEM_PROMPTS.get("v2");
String guidance = CONTEXTUAL_GUIDANCE.getOrDefault(context, "");
StringBuilder prompt = new StringBuilder(base).append("\n\n").append(guidance);
if (userHistory != null && !userHistory.isBlank()) {
prompt.append("\n\nUser conversation history:\n").append(userHistory);
}
prompt.append("\n\nUser query: ").append(query);
return prompt.toString();
}
public record ExperimentResult(String winner, double improvement) {}
public ExperimentResult runExperiment(List<String> queries, String promptA, String promptB) {
List<Double> scoresA = new ArrayList<>();
List<Double> scoresB = new ArrayList<>();
for (String query : queries) {
scoresA.add(scoreResponse(callLLM(promptA + "\n\nQuery: " + query)));
scoresB.add(scoreResponse(callLLM(promptB + "\n\nQuery: " + query)));
}
double avgA = scoresA.stream().mapToDouble(Double::doubleValue).average().orElse(0);
double avgB = scoresB.stream().mapToDouble(Double::doubleValue).average().orElse(0);
String winner = avgA > avgB ? "A" : "B";
double improvement = Math.abs((avgB - avgA) / avgA) * 100;
return new ExperimentResult(winner, improvement);
}
private double scoreResponse(String response) {
return response.length() / 100.0;
}
private String callLLM(String prompt) {
// Implement with Anthropic SDK
return "";
}
}import anthropic
from dataclasses import dataclass
SYSTEM_PROMPTS = {
"v1": "You are a customer support specialist...",
"v2": """You are an expert customer support agent with deep knowledge...
Guidelines:
- Always be empathetic
- Provide specific solutions, not generic advice
- Escalate to human if you cannot help""",
}
CONTEXTUAL_GUIDANCE = {
"support": "Focus on resolving the user's problem quickly. Prioritize their satisfaction.",
"sales": "Listen to their needs. Recommend products that solve their specific problems.",
"technical": "Provide accurate technical information. Link to documentation when relevant.",
}
def generate_prompt(context: str, query: str, user_history: str | None = None) -> str:
base = SYSTEM_PROMPTS["v2"]
guidance = CONTEXTUAL_GUIDANCE.get(context, "")
prompt = f"{base}\n\n{guidance}"
if user_history:
prompt += f"\n\nUser conversation history:\n{user_history}"
prompt += f"\n\nUser query: {query}"
return prompt
async def call_llm(prompt: str) -> str:
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
text_block = next((b for b in response.content if b.type == "text"), None)
return text_block.text if text_block else ""
@dataclass
class ExperimentResult:
winner: str
improvement: float
class PromptExperiment:
async def run_experiment(
self, queries: list[str], prompt_a: str, prompt_b: str
) -> ExperimentResult:
scores_a = []
scores_b = []
for query in queries:
res_a = await call_llm(prompt_a + "\n\nQuery: " + query)
res_b = await call_llm(prompt_b + "\n\nQuery: " + query)
scores_a.append(self._score_response(res_a))
scores_b.append(self._score_response(res_b))
avg_a = sum(scores_a) / len(scores_a)
avg_b = sum(scores_b) / len(scores_b)
winner = "A" if avg_a > avg_b else "B"
improvement = abs((avg_b - avg_a) / avg_a) * 100 if avg_a else 0
return ExperimentResult(winner=winner, improvement=improvement)
def _score_response(self, response: str) -> float:
return len(response) / 100using Anthropic.SDK;
using Anthropic.SDK.Messaging;
public class PromptEngineering
{
private static readonly Dictionary<string, string> SystemPrompts = new()
{
["v1"] = "You are a customer support specialist...",
["v2"] = """
You are an expert customer support agent with deep knowledge...
Guidelines:
- Always be empathetic
- Provide specific solutions, not generic advice
- Escalate to human if you cannot help
""",
};
private static readonly Dictionary<string, string> ContextualGuidance = new()
{
["support"] = "Focus on resolving the user's problem quickly. Prioritize their satisfaction.",
["sales"] = "Listen to their needs. Recommend products that solve their specific problems.",
["technical"] = "Provide accurate technical information. Link to documentation when relevant.",
};
public string GeneratePrompt(string context, string query, string? userHistory = null)
{
var base_ = SystemPrompts["v2"];
var guidance = ContextualGuidance.GetValueOrDefault(context, "");
var prompt = $"{base_}\n\n{guidance}";
if (!string.IsNullOrWhiteSpace(userHistory))
prompt += $"\n\nUser conversation history:\n{userHistory}";
prompt += $"\n\nUser query: {query}";
return prompt;
}
public async Task<string> CallLLMAsync(string prompt)
{
var client = new AnthropicClient(Environment.GetEnvironmentVariable("ANTHROPIC_API_KEY")!);
var response = await client.Messages.CreateAsync(new MessageRequest
{
Model = "claude-3-5-sonnet-20241022",
MaxTokens = 1024,
Messages = new List<Message> { new() { Role = "user", Content = prompt } }
});
return response.Content.OfType<TextContent>().FirstOrDefault()?.Text ?? "";
}
public record ExperimentResult(string Winner, double Improvement);
public async Task<ExperimentResult> RunExperimentAsync(
List<string> queries, string promptA, string promptB)
{
var scoresA = new List<double>();
var scoresB = new List<double>();
foreach (var query in queries)
{
var resA = await CallLLMAsync(promptA + "\n\nQuery: " + query);
var resB = await CallLLMAsync(promptB + "\n\nQuery: " + query);
scoresA.Add(ScoreResponse(resA));
scoresB.Add(ScoreResponse(resB));
}
var avgA = scoresA.Average();
var avgB = scoresB.Average();
var winner = avgA > avgB ? "A" : "B";
var improvement = Math.Abs((avgB - avgA) / avgA) * 100;
return new ExperimentResult(winner, improvement);
}
private double ScoreResponse(string response) => response.Length / 100.0;
}When NOT to Use AI
The most important lesson I’ve learned is knowing when not to use AI.
Don’t use AI for:
- Deterministic problems: Use traditional algorithms for calculations, sorting, searching
- Performance-critical code paths: AI has inherent latency
- High-security operations: Verify AI output before sensitive actions
- Tasks requiring guarantees: Legal compliance, financial accuracy, life-safety decisions
- Where you lack data: AI makes confident-sounding but incorrect answers
AI excels at:
- Content generation and summarization
- Classification and sentiment analysis
- Pattern matching and anomaly detection
- Natural language understanding and generation
- Customer service and chat applications
Final Thoughts
Integrating AI into production is not about using the latest models. It’s about architectural discipline: understanding your costs, building fallbacks, monitoring quality, and knowing when AI is the wrong tool.
The companies winning with AI aren’t those pushing the boundaries of what models can do. They’re the ones that cleanly integrate AI into reliable systems with proper safeguards, monitoring, and human oversight.
Your AI system is only as reliable as its fallback. Design accordingly.