Files
opantoantro/handlers/anthropic.go
T

363 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package handlers
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/gofiber/fiber/v3"
"gorm.io/gorm"
"optoant/config"
"optoant/internal/logger"
"optoant/internal/proxy"
"optoant/internal/transform"
"optoant/models"
)
// infoPage returns an HTML page explaining the Anthropic endpoint.
func infoPage(c fiber.Ctx, cfg *config.Config) error {
html := fmt.Sprintf(`<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Anthropic API — optoant</title>
<style>
body { font-family: -apple-system, BlinkMacSystemFont, sans-serif; max-width: 720px; margin: 60px auto; padding: 0 20px; color: #222; }
code { background: #f0f0f0; padding: 2px 6px; border-radius: 4px; }
pre { background: #1e1e1e; color: #d4d4d4; padding: 16px; border-radius: 8px; overflow-x: auto; }
.ok { color: #22c55e; }
.url { color: #6366f1; }
</style>
</head>
<body>
<h1>Anthropic API — <span class="ok">aktif</span></h1>
<p>Bu endpoint Anthropic Messages API formatını kabul eder, OpenAI formatına çevirir ve upstream'e iletir.</p>
<h3>Endpoint</h3>
<code>POST http://%s/anthropic/v1/messages</code>
<h3>Upstream</h3>
<code class="url">%s/v1/chat/completions</code>
<h3>Örnek curl</h3>
<pre>curl -X POST "http://%s/anthropic/v1/messages" \
-H "Authorization: Bearer $KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "claude-3-5-sonnet-20241022",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello!"}]
}'</pre>
<p><small>Swagger UI: <a href="/swagger/">/swagger/</a> | Health: <a href="/health">/health</a></small></p>
</body>
</html>`, c.Hostname(), strings.TrimRight(cfg.OpenAIBackend, "/"), c.Hostname())
return c.Type("html").SendString(html)
}
// AnthropicHandler handles all /anthropic/* requests.
// Always converts the Anthropic request to OpenAI format, forwards to OPENAI_BACKEND,
// and converts the OpenAI response back to Anthropic format.
//
// @Summary Anthropic-compatible proxy
// @Description Converts Anthropic Messages API requests to OpenAI format, forwards to OPENAI_BACKEND, and converts the response back to Anthropic format.
// @Tags anthropic
// @Accept json
// @Produce json
// @Param path path string true "Anthropic API path (e.g. v1/messages)"
// @Success 200 {object} map[string]interface{}
// @Failure 400 {object} map[string]string
// @Failure 502 {object} map[string]string
// @Router /anthropic/{path} [post]
func AnthropicHandler(cfg *config.Config, db *gorm.DB) fiber.Handler {
return func(c fiber.Ctx) error {
// ── Handle Anthropic Models API ──
if c.Method() == "GET" && strings.HasSuffix(c.OriginalURL(), "/v1/models") {
return modelsList(c, cfg)
}
bodyBytes := c.Body()
// HEAD → 404 (matching DeepSeek behavior)
if c.Method() == "HEAD" {
return c.Status(fiber.StatusNotFound).SendString("")
}
// Empty body → API status
if len(bodyBytes) == 0 {
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"type": "message",
"role": "assistant",
"content": []fiber.Map{{"type": "text", "text": "optoant gateway ready"}},
"stop_reason": "end_turn",
})
}
// Inject default model from config if request body has no model field
if cfg.OpenAIModel != "" {
bodyBytes = injectDefaultModel(bodyBytes, cfg.OpenAIModel)
}
start := time.Now()
reqHeaders := fiberToHTTPHeaders(c)
// Convert x-api-key → Authorization: Bearer (Claude Code compatibility)
if reqHeaders.Get("Authorization") == "" && reqHeaders.Get("X-Api-Key") != "" {
reqHeaders.Set("Authorization", "Bearer "+reqHeaders.Get("X-Api-Key"))
logger.Debug("│ 🔑 Converted x-api-key → Authorization: Bearer")
}
// Auto-inject API key from .env if client didn't provide one
if cfg.OpenAIApiKey != "" && reqHeaders.Get("Authorization") == "" {
reqHeaders.Set("Authorization", "Bearer "+cfg.OpenAIApiKey)
logger.Debug("│ 🔑 Auto-injected API key from OPENAI_KEY env")
}
// ── LOG: Incoming Anthropic Request ──
logger.Info("┌─ [ANTHROPIC] >>> %s %s | IP: %s", c.Method(), c.OriginalURL(), c.IP())
logger.Debug("│ Headers: %v", proxy.MaskSensitiveHeaders(reqHeaders))
logger.Debug("│ Body: %s", string(bodyBytes))
// Try Anthropic→Bifrost transform; if it fails, forward raw body to Bifrost
converted, err := transform.AnthropicToBifrost(bodyBytes)
useConverted := err == nil
if !useConverted {
logger.Debug("│ ⚠️ Not valid Anthropic format (%v) → passthrough", err)
converted = bodyBytes
} else {
// ── LOG: Converted Bifrost Request ──
logger.Debug("│ ---> CONVERTED TO OPENAI/BIFROST:")
logger.Debug("│ %s", string(converted))
}
// Forward to Bifrost — always use /v1/chat/completions, Bifrost handles routing
targetURL := strings.TrimRight(cfg.OpenAIBackend, "/") + "/v1/chat/completions"
logger.Debug("│ 🚀 Forwarding to: %s", targetURL)
// Streaming path: if enabled and original request had stream=true, handle SSE transform
if cfg.Streaming && isStreamRequest(bodyBytes) {
return handleStreaming(c, targetURL, reqHeaders, converted, cfg, db, bodyBytes, start)
}
result, err := proxy.Forward(
c.Context(),
c.Method(),
targetURL,
reqHeaders,
bytes.NewReader(converted),
cfg.RequestTimeoutSeconds,
)
latency := time.Since(start).Milliseconds()
statusCode := 502
if err == nil {
statusCode = result.StatusCode
}
// DB logging
if db != nil {
logEntry := &models.RequestLog{
Endpoint: c.OriginalURL(),
Method: c.Method(),
ClientIP: c.IP(),
RequestBody: models.TruncateBody(string(bodyBytes)),
ResponseStatus: statusCode,
LatencyMs: latency,
}
go db.Create(logEntry) //nolint:errcheck
}
if err != nil {
logger.Warn("│ ❌ UPSTREAM ERROR [IP: %s]: %v", c.IP(), err)
logger.Warn("└─ [ANTHROPIC] <<< 502 (%dms) | IP: %s", latency, c.IP())
return c.Status(fiber.StatusBadGateway).JSON(fiber.Map{
"error": fmt.Sprintf("upstream error: %v", err),
})
}
// ── LOG: Raw Bifrost Response ──
logger.Debug("│ <--- BIFROST RESPONSE (%d bytes, status %d):", len(result.Body), result.StatusCode)
logger.Debug("│ %s", string(result.Body))
// If request was Anthropic format, convert response back to Anthropic
if useConverted && result.StatusCode == 200 {
antBody, terr := transform.BifrostToAnthropic(result.Body)
if terr == nil {
// ── LOG: Final Anthropic Response ──
logger.Debug("│ <<< CONVERTED TO ANTHROPIC:")
logger.Debug("│ %s", string(antBody))
logger.Info("└─ [ANTHROPIC] <<< 200 OK (%dms)", latency)
c.Set("Content-Type", "application/json")
return c.Status(result.StatusCode).Send(antBody)
}
logger.Warn("│ ⚠️ RESPONSE TRANSFORM FAILED [IP: %s]: %v (forwarding raw)", c.IP(), terr)
}
logger.Info("└─ [ANTHROPIC] <<< %d (passthrough, %dms)", result.StatusCode, latency)
copyResponseHeaders(c, result.Headers)
return c.Status(result.StatusCode).Send(result.Body)
}
}
// modelsList returns the Anthropic-compatible models list, including the
// configured OPENAI_MODEL if set.
func modelsList(c fiber.Ctx, cfg *config.Config) error {
type ModelData struct {
ID string `json:"id"`
Type string `json:"type"`
DisplayName string `json:"display_name"`
CreatedAt string `json:"created_at"`
}
models := []ModelData{}
if cfg.OpenAIModel != "" {
models = append(models, ModelData{
ID: cfg.OpenAIModel,
Type: "model",
DisplayName: cfg.OpenAIModel,
CreatedAt: "2026-01-01T00:00:00Z",
})
}
// Add fallback models if none configured
if len(models) == 0 {
models = []ModelData{
{ID: "deepseek-v4-flash", Type: "model", DisplayName: "DeepSeek V4 Flash", CreatedAt: "2026-01-01T00:00:00Z"},
{ID: "deepseek-v4-pro", Type: "model", DisplayName: "DeepSeek V4 Pro", CreatedAt: "2026-01-01T00:00:00Z"},
}
}
response := map[string]interface{}{
"data": models,
"has_more": false,
"first_id": models[0].ID,
"last_id": models[len(models)-1].ID,
}
body, _ := json.Marshal(response)
logger.Info("[ANTHROPIC] GET /v1/models -> 200 OK")
return c.Type("json").Send(body)
}
// isStreamRequest checks if the original body has "stream": true.
func isStreamRequest(body []byte) bool {
var m map[string]interface{}
if json.Unmarshal(body, &m) != nil {
return false
}
if v, ok := m["stream"]; ok {
if b, ok := v.(bool); ok {
return b
}
}
return false
}
// handleStreaming forwards a request to upstream with streaming enabled,
// transforms OpenAI SSE chunks into Anthropic SSE events, and streams
// the result back to the client via Fiber.
func handleStreaming(
c fiber.Ctx,
targetURL string,
headers http.Header,
body []byte,
cfg *config.Config,
db *gorm.DB,
originalBody []byte,
start time.Time,
) error {
// Direct HTTP request to upstream (bypass proxy.Forward for streaming)
ctx, cancel := context.WithTimeout(c.Context(), time.Duration(cfg.RequestTimeoutSeconds)*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", targetURL, bytes.NewReader(body))
if err != nil {
return c.Status(fiber.StatusInternalServerError).SendString("build request failed")
}
for key, vals := range headers {
for _, v := range vals {
req.Header.Add(key, v)
}
}
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
logger.Warn("[ANTHROPIC] Stream error [IP: %s]: %v", c.IP(), err)
return c.Status(fiber.StatusBadGateway).SendString("upstream error")
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := io.ReadAll(resp.Body)
logger.Warn("[ANTHROPIC] Stream upstream %d [IP: %s]: %s", resp.StatusCode, c.IP(), string(body))
return c.Status(resp.StatusCode).Send(body)
}
// Stream the SSE response
// Capture values before SetBodyStreamWriter — the callback runs in a
// separate goroutine where fiber.Ctx is no longer valid.
clientIP := c.IP()
transformer := transform.NewStreamTransformer("", "")
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Response().SetBodyStreamWriter(func(w *bufio.Writer) {
scanner := bufio.NewScanner(resp.Body)
const maxScanTokenSize = 4 * 1024 * 1024 // 4 MB — Claude Code tool calls produce large chunks
buf := make([]byte, maxScanTokenSize)
scanner.Buffer(buf, maxScanTokenSize)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
chunk := []byte(strings.TrimPrefix(line, "data: "))
event := transformer.TransformChunk(chunk)
if event != "" {
if _, err := w.WriteString(event); err != nil {
logger.Warn("[ANTHROPIC] Stream write error [IP: %s]: %v", clientIP, err)
return
}
if err := w.Flush(); err != nil {
logger.Warn("[ANTHROPIC] Stream flush error [IP: %s]: %v", clientIP, err)
return
}
}
}
if err := scanner.Err(); err != nil {
logger.Warn("[ANTHROPIC] Stream scan error [IP: %s]: %v", clientIP, err)
return
}
// Ensure final events are sent
final := transformer.TransformChunk([]byte("[DONE]"))
if final != "" {
w.WriteString(final)
w.Flush()
}
})
// DB logging
latency := time.Since(start).Milliseconds()
if db != nil {
logEntry := &models.RequestLog{
Endpoint: c.OriginalURL(),
Method: c.Method(),
ClientIP: c.IP(),
RequestBody: models.TruncateBody(string(originalBody)),
ResponseStatus: 200,
LatencyMs: latency,
}
go db.Create(logEntry)
}
logger.Info("└─ [ANTHROPIC] <<< 200 streaming (%dms)", latency)
return nil
}