345 lines
11 KiB
Go
345 lines
11 KiB
Go
package handlers
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/gofiber/fiber/v3"
|
||
"gorm.io/gorm"
|
||
|
||
"optoant/config"
|
||
"optoant/internal/logger"
|
||
"optoant/internal/proxy"
|
||
"optoant/internal/transform"
|
||
"optoant/models"
|
||
)
|
||
|
||
// infoPage returns an HTML page explaining the Anthropic endpoint.
|
||
func infoPage(c fiber.Ctx, cfg *config.Config) error {
|
||
html := fmt.Sprintf(`<!DOCTYPE html>
|
||
<html lang="en">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<title>Anthropic API — optoant</title>
|
||
<style>
|
||
body { font-family: -apple-system, BlinkMacSystemFont, sans-serif; max-width: 720px; margin: 60px auto; padding: 0 20px; color: #222; }
|
||
code { background: #f0f0f0; padding: 2px 6px; border-radius: 4px; }
|
||
pre { background: #1e1e1e; color: #d4d4d4; padding: 16px; border-radius: 8px; overflow-x: auto; }
|
||
.ok { color: #22c55e; }
|
||
.url { color: #6366f1; }
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<h1>Anthropic API — <span class="ok">aktif</span></h1>
|
||
<p>Bu endpoint Anthropic Messages API formatını kabul eder, OpenAI formatına çevirir ve upstream'e iletir.</p>
|
||
|
||
<h3>Endpoint</h3>
|
||
<code>POST http://%s/anthropic/v1/messages</code>
|
||
|
||
<h3>Upstream</h3>
|
||
<code class="url">%s/v1/chat/completions</code>
|
||
|
||
<h3>Örnek curl</h3>
|
||
<pre>curl -X POST "http://%s/anthropic/v1/messages" \
|
||
-H "Authorization: Bearer $KEY" \
|
||
-H "Content-Type: application/json" \
|
||
-d '{
|
||
"model": "claude-3-5-sonnet-20241022",
|
||
"max_tokens": 1024,
|
||
"messages": [{"role": "user", "content": "Hello!"}]
|
||
}'</pre>
|
||
|
||
<p><small>Swagger UI: <a href="/swagger/">/swagger/</a> | Health: <a href="/health">/health</a></small></p>
|
||
</body>
|
||
</html>`, c.Hostname(), strings.TrimRight(cfg.OpenAIBackend, "/"), c.Hostname())
|
||
return c.Type("html").SendString(html)
|
||
}
|
||
|
||
// AnthropicHandler handles all /anthropic/* requests.
|
||
// Always converts the Anthropic request to OpenAI format, forwards to OPENAI_BACKEND,
|
||
// and converts the OpenAI response back to Anthropic format.
|
||
//
|
||
// @Summary Anthropic-compatible proxy
|
||
// @Description Converts Anthropic Messages API requests to OpenAI format, forwards to OPENAI_BACKEND, and converts the response back to Anthropic format.
|
||
// @Tags anthropic
|
||
// @Accept json
|
||
// @Produce json
|
||
// @Param path path string true "Anthropic API path (e.g. v1/messages)"
|
||
// @Success 200 {object} map[string]interface{}
|
||
// @Failure 400 {object} map[string]string
|
||
// @Failure 502 {object} map[string]string
|
||
// @Router /anthropic/{path} [post]
|
||
func AnthropicHandler(cfg *config.Config, db *gorm.DB) fiber.Handler {
|
||
return func(c fiber.Ctx) error {
|
||
// ── Handle Anthropic Models API ──
|
||
if c.Method() == "GET" && strings.HasSuffix(c.OriginalURL(), "/v1/models") {
|
||
return modelsList(c, cfg)
|
||
}
|
||
|
||
bodyBytes := c.Body()
|
||
|
||
// HEAD → 404 (matching DeepSeek behavior)
|
||
if c.Method() == "HEAD" {
|
||
return c.Status(fiber.StatusNotFound).SendString("")
|
||
}
|
||
// Empty body → API status
|
||
if len(bodyBytes) == 0 {
|
||
return c.Status(fiber.StatusOK).JSON(fiber.Map{
|
||
"type": "message",
|
||
"role": "assistant",
|
||
"content": []fiber.Map{{"type": "text", "text": "optoant gateway ready"}},
|
||
"stop_reason": "end_turn",
|
||
})
|
||
}
|
||
|
||
// Inject default model from config if request body has no model field
|
||
if cfg.OpenAIModel != "" {
|
||
bodyBytes = injectDefaultModel(bodyBytes, cfg.OpenAIModel)
|
||
}
|
||
|
||
start := time.Now()
|
||
reqHeaders := fiberToHTTPHeaders(c)
|
||
|
||
// Convert x-api-key → Authorization: Bearer (Claude Code compatibility)
|
||
if reqHeaders.Get("Authorization") == "" && reqHeaders.Get("X-Api-Key") != "" {
|
||
reqHeaders.Set("Authorization", "Bearer "+reqHeaders.Get("X-Api-Key"))
|
||
logger.Debug("│ 🔑 Converted x-api-key → Authorization: Bearer")
|
||
}
|
||
|
||
// Auto-inject API key from .env if client didn't provide one
|
||
if cfg.OpenAIApiKey != "" && reqHeaders.Get("Authorization") == "" {
|
||
reqHeaders.Set("Authorization", "Bearer "+cfg.OpenAIApiKey)
|
||
logger.Debug("│ 🔑 Auto-injected API key from OPENAI_KEY env")
|
||
}
|
||
|
||
// ── LOG: Incoming Anthropic Request ──
|
||
logger.Info("┌─ [ANTHROPIC] >>> %s %s | IP: %s", c.Method(), c.OriginalURL(), c.IP())
|
||
logger.Debug("│ Headers: %v", proxy.MaskSensitiveHeaders(reqHeaders))
|
||
logger.Debug("│ Body: %s", string(bodyBytes))
|
||
|
||
// Try Anthropic→Bifrost transform; if it fails, forward raw body to Bifrost
|
||
converted, err := transform.AnthropicToBifrost(bodyBytes)
|
||
useConverted := err == nil
|
||
if !useConverted {
|
||
logger.Debug("│ ⚠️ Not valid Anthropic format (%v) → passthrough", err)
|
||
converted = bodyBytes
|
||
} else {
|
||
// ── LOG: Converted Bifrost Request ──
|
||
logger.Debug("│ ---> CONVERTED TO OPENAI/BIFROST:")
|
||
logger.Debug("│ %s", string(converted))
|
||
}
|
||
|
||
// Forward to Bifrost — always use /v1/chat/completions, Bifrost handles routing
|
||
targetURL := strings.TrimRight(cfg.OpenAIBackend, "/") + "/v1/chat/completions"
|
||
logger.Debug("│ 🚀 Forwarding to: %s", targetURL)
|
||
|
||
// Streaming path: if enabled and original request had stream=true, handle SSE transform
|
||
if cfg.Streaming && isStreamRequest(bodyBytes) {
|
||
return handleStreaming(c, targetURL, reqHeaders, converted, cfg, db, bodyBytes, start)
|
||
}
|
||
|
||
result, err := proxy.Forward(
|
||
c.Context(),
|
||
c.Method(),
|
||
targetURL,
|
||
reqHeaders,
|
||
bytes.NewReader(converted),
|
||
cfg.RequestTimeoutSeconds,
|
||
)
|
||
|
||
latency := time.Since(start).Milliseconds()
|
||
statusCode := 502
|
||
if err == nil {
|
||
statusCode = result.StatusCode
|
||
}
|
||
|
||
// DB logging
|
||
if db != nil {
|
||
logEntry := &models.RequestLog{
|
||
Endpoint: c.OriginalURL(),
|
||
Method: c.Method(),
|
||
ClientIP: c.IP(),
|
||
RequestBody: models.TruncateBody(string(bodyBytes)),
|
||
ResponseStatus: statusCode,
|
||
LatencyMs: latency,
|
||
}
|
||
go db.Create(logEntry) //nolint:errcheck
|
||
}
|
||
|
||
if err != nil {
|
||
logger.Warn("│ ❌ UPSTREAM ERROR [IP: %s]: %v", c.IP(), err)
|
||
logger.Warn("└─ [ANTHROPIC] <<< 502 (%dms) | IP: %s", latency, c.IP())
|
||
return c.Status(fiber.StatusBadGateway).JSON(fiber.Map{
|
||
"error": fmt.Sprintf("upstream error: %v", err),
|
||
})
|
||
}
|
||
|
||
// ── LOG: Raw Bifrost Response ──
|
||
logger.Debug("│ <--- BIFROST RESPONSE (%d bytes, status %d):", len(result.Body), result.StatusCode)
|
||
logger.Debug("│ %s", string(result.Body))
|
||
|
||
// If request was Anthropic format, convert response back to Anthropic
|
||
if useConverted && result.StatusCode == 200 {
|
||
antBody, terr := transform.BifrostToAnthropic(result.Body)
|
||
if terr == nil {
|
||
// ── LOG: Final Anthropic Response ──
|
||
logger.Debug("│ <<< CONVERTED TO ANTHROPIC:")
|
||
logger.Debug("│ %s", string(antBody))
|
||
logger.Info("└─ [ANTHROPIC] <<< 200 OK (%dms)", latency)
|
||
|
||
c.Set("Content-Type", "application/json")
|
||
return c.Status(result.StatusCode).Send(antBody)
|
||
}
|
||
logger.Warn("│ ⚠️ RESPONSE TRANSFORM FAILED [IP: %s]: %v (forwarding raw)", c.IP(), terr)
|
||
}
|
||
|
||
logger.Info("└─ [ANTHROPIC] <<< %d (passthrough, %dms)", result.StatusCode, latency)
|
||
copyResponseHeaders(c, result.Headers)
|
||
return c.Status(result.StatusCode).Send(result.Body)
|
||
}
|
||
}
|
||
|
||
// modelsList returns the Anthropic-compatible models list, including the
|
||
// configured OPENAI_MODEL if set.
|
||
func modelsList(c fiber.Ctx, cfg *config.Config) error {
|
||
type ModelData struct {
|
||
ID string `json:"id"`
|
||
Type string `json:"type"`
|
||
DisplayName string `json:"display_name"`
|
||
CreatedAt string `json:"created_at"`
|
||
}
|
||
models := []ModelData{}
|
||
if cfg.OpenAIModel != "" {
|
||
models = append(models, ModelData{
|
||
ID: cfg.OpenAIModel,
|
||
Type: "model",
|
||
DisplayName: cfg.OpenAIModel,
|
||
CreatedAt: "2026-01-01T00:00:00Z",
|
||
})
|
||
}
|
||
// Add fallback models if none configured
|
||
if len(models) == 0 {
|
||
models = []ModelData{
|
||
{ID: "deepseek-v4-flash", Type: "model", DisplayName: "DeepSeek V4 Flash", CreatedAt: "2026-01-01T00:00:00Z"},
|
||
{ID: "deepseek-v4-pro", Type: "model", DisplayName: "DeepSeek V4 Pro", CreatedAt: "2026-01-01T00:00:00Z"},
|
||
}
|
||
}
|
||
response := map[string]interface{}{
|
||
"data": models,
|
||
"has_more": false,
|
||
"first_id": models[0].ID,
|
||
"last_id": models[len(models)-1].ID,
|
||
}
|
||
body, _ := json.Marshal(response)
|
||
logger.Info("[ANTHROPIC] GET /v1/models -> 200 OK")
|
||
return c.Type("json").Send(body)
|
||
}
|
||
|
||
// isStreamRequest checks if the original body has "stream": true.
|
||
func isStreamRequest(body []byte) bool {
|
||
var m map[string]interface{}
|
||
if json.Unmarshal(body, &m) != nil {
|
||
return false
|
||
}
|
||
if v, ok := m["stream"]; ok {
|
||
if b, ok := v.(bool); ok {
|
||
return b
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// handleStreaming forwards a request to upstream with streaming enabled,
|
||
// transforms OpenAI SSE chunks into Anthropic SSE events, and streams
|
||
// the result back to the client via Fiber.
|
||
func handleStreaming(
|
||
c fiber.Ctx,
|
||
targetURL string,
|
||
headers http.Header,
|
||
body []byte,
|
||
cfg *config.Config,
|
||
db *gorm.DB,
|
||
originalBody []byte,
|
||
start time.Time,
|
||
) error {
|
||
// Direct HTTP request to upstream (bypass proxy.Forward for streaming)
|
||
ctx, cancel := context.WithTimeout(c.Context(), time.Duration(cfg.RequestTimeoutSeconds)*time.Second)
|
||
defer cancel()
|
||
|
||
req, err := http.NewRequestWithContext(ctx, "POST", targetURL, bytes.NewReader(body))
|
||
if err != nil {
|
||
return c.Status(fiber.StatusInternalServerError).SendString("build request failed")
|
||
}
|
||
for key, vals := range headers {
|
||
for _, v := range vals {
|
||
req.Header.Add(key, v)
|
||
}
|
||
}
|
||
req.Header.Set("Accept", "text/event-stream")
|
||
|
||
client := &http.Client{}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
logger.Warn("[ANTHROPIC] Stream error [IP: %s]: %v", c.IP(), err)
|
||
return c.Status(fiber.StatusBadGateway).SendString("upstream error")
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != 200 {
|
||
body, _ := io.ReadAll(resp.Body)
|
||
logger.Warn("[ANTHROPIC] Stream upstream %d [IP: %s]: %s", resp.StatusCode, c.IP(), string(body))
|
||
return c.Status(resp.StatusCode).Send(body)
|
||
}
|
||
|
||
// Stream the SSE response
|
||
transformer := transform.NewStreamTransformer("", "")
|
||
c.Set("Content-Type", "text/event-stream")
|
||
c.Set("Cache-Control", "no-cache")
|
||
c.Set("Connection", "keep-alive")
|
||
|
||
c.Response().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||
scanner := bufio.NewScanner(resp.Body)
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
if !strings.HasPrefix(line, "data: ") {
|
||
continue
|
||
}
|
||
chunk := []byte(strings.TrimPrefix(line, "data: "))
|
||
event := transformer.TransformChunk(chunk)
|
||
if event != "" {
|
||
w.WriteString(event)
|
||
w.Flush()
|
||
}
|
||
}
|
||
// Ensure final events are sent
|
||
final := transformer.TransformChunk([]byte("[DONE]"))
|
||
if final != "" {
|
||
w.WriteString(final)
|
||
w.Flush()
|
||
}
|
||
})
|
||
|
||
// DB logging
|
||
latency := time.Since(start).Milliseconds()
|
||
if db != nil {
|
||
logEntry := &models.RequestLog{
|
||
Endpoint: c.OriginalURL(),
|
||
Method: c.Method(),
|
||
ClientIP: c.IP(),
|
||
RequestBody: models.TruncateBody(string(originalBody)),
|
||
ResponseStatus: 200,
|
||
LatencyMs: latency,
|
||
}
|
||
go db.Create(logEntry)
|
||
}
|
||
logger.Info("└─ [ANTHROPIC] <<< 200 streaming (%dms)", latency)
|
||
return nil
|
||
}
|