first commit
This commit is contained in:
@@ -0,0 +1,352 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gofiber/fiber/v3"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"optoant/config"
|
||||
"optoant/internal/logger"
|
||||
"optoant/internal/proxy"
|
||||
"optoant/internal/transform"
|
||||
"optoant/models"
|
||||
)
|
||||
|
||||
// infoPage returns an HTML page explaining the Anthropic endpoint.
|
||||
func infoPage(c fiber.Ctx, cfg *config.Config) error {
|
||||
html := fmt.Sprintf(`<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Anthropic API — optoant</title>
|
||||
<style>
|
||||
body { font-family: -apple-system, BlinkMacSystemFont, sans-serif; max-width: 720px; margin: 60px auto; padding: 0 20px; color: #222; }
|
||||
code { background: #f0f0f0; padding: 2px 6px; border-radius: 4px; }
|
||||
pre { background: #1e1e1e; color: #d4d4d4; padding: 16px; border-radius: 8px; overflow-x: auto; }
|
||||
.ok { color: #22c55e; }
|
||||
.url { color: #6366f1; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Anthropic API — <span class="ok">aktif</span></h1>
|
||||
<p>Bu endpoint Anthropic Messages API formatını kabul eder, OpenAI formatına çevirir ve upstream'e iletir.</p>
|
||||
|
||||
<h3>Endpoint</h3>
|
||||
<code>POST http://%s/anthropic/v1/messages</code>
|
||||
|
||||
<h3>Upstream</h3>
|
||||
<code class="url">%s/v1/chat/completions</code>
|
||||
|
||||
<h3>Örnek curl</h3>
|
||||
<pre>curl -X POST "http://%s/anthropic/v1/messages" \
|
||||
-H "Authorization: Bearer $KEY" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"model": "claude-3-5-sonnet-20241022",
|
||||
"max_tokens": 1024,
|
||||
"messages": [{"role": "user", "content": "Hello!"}]
|
||||
}'</pre>
|
||||
|
||||
<p><small>Swagger UI: <a href="/swagger/">/swagger/</a> | Health: <a href="/health">/health</a></small></p>
|
||||
</body>
|
||||
</html>`, c.Hostname(), strings.TrimRight(cfg.OpenAIBackend, "/"), c.Hostname())
|
||||
return c.Type("html").SendString(html)
|
||||
}
|
||||
|
||||
// AnthropicHandler handles all /anthropic/* requests.
|
||||
// Always converts the Anthropic request to OpenAI format, forwards to OPENAI_BACKEND,
|
||||
// and converts the OpenAI response back to Anthropic format.
|
||||
//
|
||||
// @Summary Anthropic-compatible proxy
|
||||
// @Description Converts Anthropic Messages API requests to OpenAI format, forwards to OPENAI_BACKEND, and converts the response back to Anthropic format.
|
||||
// @Tags anthropic
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param path path string true "Anthropic API path (e.g. v1/messages)"
|
||||
// @Success 200 {object} map[string]interface{}
|
||||
// @Failure 400 {object} map[string]string
|
||||
// @Failure 502 {object} map[string]string
|
||||
// @Router /anthropic/{path} [post]
|
||||
func AnthropicHandler(cfg *config.Config, db *gorm.DB) fiber.Handler {
|
||||
return func(c fiber.Ctx) error {
|
||||
// ── Handle Anthropic Models API ──
|
||||
if c.Method() == "GET" && strings.HasSuffix(c.OriginalURL(), "/v1/models") {
|
||||
return modelsList(c, cfg)
|
||||
}
|
||||
|
||||
bodyBytes := c.Body()
|
||||
|
||||
// HEAD → 404 (matching DeepSeek behavior)
|
||||
if c.Method() == "HEAD" {
|
||||
return c.Status(fiber.StatusNotFound).SendString("")
|
||||
}
|
||||
// Empty body → API status
|
||||
if len(bodyBytes) == 0 {
|
||||
return c.Status(fiber.StatusOK).JSON(fiber.Map{
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": []fiber.Map{{"type": "text", "text": "optoant gateway ready"}},
|
||||
"stop_reason": "end_turn",
|
||||
})
|
||||
}
|
||||
|
||||
// Inject default model from config if request body has no model field
|
||||
if cfg.OpenAIModel != "" {
|
||||
bodyBytes = injectDefaultModel(bodyBytes, cfg.OpenAIModel)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
reqHeaders := fiberToHTTPHeaders(c)
|
||||
|
||||
// Convert x-api-key → Authorization: Bearer (Claude Code compatibility)
|
||||
if reqHeaders.Get("Authorization") == "" && reqHeaders.Get("X-Api-Key") != "" {
|
||||
reqHeaders.Set("Authorization", "Bearer "+reqHeaders.Get("X-Api-Key"))
|
||||
logger.Debug("│ 🔑 Converted x-api-key → Authorization: Bearer")
|
||||
}
|
||||
|
||||
// Auto-inject API key from .env if client didn't provide one
|
||||
if cfg.OpenAIApiKey != "" && reqHeaders.Get("Authorization") == "" {
|
||||
reqHeaders.Set("Authorization", "Bearer "+cfg.OpenAIApiKey)
|
||||
logger.Debug("│ 🔑 Auto-injected API key from OPENAI_KEY env")
|
||||
}
|
||||
|
||||
// ── LOG: Incoming Anthropic Request ──
|
||||
logger.Info("┌─ [ANTHROPIC] >>> %s %s | IP: %s", c.Method(), c.OriginalURL(), c.IP())
|
||||
logger.Debug("│ Headers: %v", proxy.MaskSensitiveHeaders(reqHeaders))
|
||||
logger.Debug("│ Body: %s", string(bodyBytes))
|
||||
|
||||
// Try Anthropic→Bifrost transform; if it fails, forward raw body to Bifrost
|
||||
converted, err := transform.AnthropicToBifrost(bodyBytes)
|
||||
useConverted := err == nil
|
||||
if !useConverted {
|
||||
logger.Debug("│ ⚠️ Not valid Anthropic format (%v) → passthrough", err)
|
||||
converted = bodyBytes
|
||||
} else {
|
||||
// ── LOG: Converted Bifrost Request ──
|
||||
logger.Debug("│ ---> CONVERTED TO OPENAI/BIFROST:")
|
||||
logger.Debug("│ %s", string(converted))
|
||||
}
|
||||
|
||||
// Forward to Bifrost — always use /v1/chat/completions, Bifrost handles routing
|
||||
targetURL := strings.TrimRight(cfg.OpenAIBackend, "/") + "/v1/chat/completions"
|
||||
logger.Debug("│ 🚀 Forwarding to: %s", targetURL)
|
||||
|
||||
// Streaming path: if original request had stream=true, handle SSE transform
|
||||
if isStreamRequest(bodyBytes) {
|
||||
return handleStreaming(c, targetURL, reqHeaders, converted, cfg, db, bodyBytes, start)
|
||||
}
|
||||
|
||||
result, err := proxy.Forward(
|
||||
c.Context(),
|
||||
c.Method(),
|
||||
targetURL,
|
||||
reqHeaders,
|
||||
bytes.NewReader(converted),
|
||||
cfg.RequestTimeoutSeconds,
|
||||
)
|
||||
|
||||
latency := time.Since(start).Milliseconds()
|
||||
statusCode := 502
|
||||
if err == nil {
|
||||
statusCode = result.StatusCode
|
||||
}
|
||||
|
||||
// DB logging
|
||||
if db != nil {
|
||||
logEntry := &models.RequestLog{
|
||||
Endpoint: c.OriginalURL(),
|
||||
Method: c.Method(),
|
||||
ClientIP: c.IP(),
|
||||
RequestBody: models.TruncateBody(string(bodyBytes)),
|
||||
ResponseStatus: statusCode,
|
||||
LatencyMs: latency,
|
||||
}
|
||||
go db.Create(logEntry) //nolint:errcheck
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Warn("│ ❌ UPSTREAM ERROR: %v", err)
|
||||
logger.Warn("└─ [ANTHROPIC] <<< 502 (%dms)", latency)
|
||||
return c.Status(fiber.StatusBadGateway).JSON(fiber.Map{
|
||||
"error": fmt.Sprintf("upstream error: %v", err),
|
||||
})
|
||||
}
|
||||
|
||||
// ── LOG: Raw Bifrost Response ──
|
||||
logger.Debug("│ <--- BIFROST RESPONSE (%d bytes, status %d):", len(result.Body), result.StatusCode)
|
||||
logger.Debug("│ %s", string(result.Body))
|
||||
|
||||
// If request was Anthropic format, convert response back to Anthropic
|
||||
if useConverted && result.StatusCode == 200 {
|
||||
antBody, terr := transform.BifrostToAnthropic(result.Body)
|
||||
if terr == nil {
|
||||
// ── LOG: Final Anthropic Response ──
|
||||
logger.Debug("│ <<< CONVERTED TO ANTHROPIC:")
|
||||
logger.Debug("│ %s", string(antBody))
|
||||
logger.Info("└─ [ANTHROPIC] <<< 200 OK (%dms)", latency)
|
||||
|
||||
c.Set("Content-Type", "application/json")
|
||||
return c.Status(result.StatusCode).Send(antBody)
|
||||
}
|
||||
logger.Warn("│ ⚠️ RESPONSE TRANSFORM FAILED: %v (forwarding raw)", terr)
|
||||
}
|
||||
|
||||
logger.Info("└─ [ANTHROPIC] <<< %d (passthrough, %dms)", result.StatusCode, latency)
|
||||
copyResponseHeaders(c, result.Headers)
|
||||
return c.Status(result.StatusCode).Send(result.Body)
|
||||
}
|
||||
}
|
||||
|
||||
// modelsList returns the Anthropic-compatible models list, including the
|
||||
// configured OPENAI_MODEL if set.
|
||||
func modelsList(c fiber.Ctx, cfg *config.Config) error {
|
||||
type ModelData struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
DisplayName string `json:"display_name"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
models := []ModelData{}
|
||||
if cfg.OpenAIModel != "" {
|
||||
models = append(models, ModelData{
|
||||
ID: cfg.OpenAIModel,
|
||||
Type: "model",
|
||||
DisplayName: cfg.OpenAIModel,
|
||||
CreatedAt: "2026-01-01T00:00:00Z",
|
||||
})
|
||||
}
|
||||
// Add fallback models if none configured
|
||||
if len(models) == 0 {
|
||||
models = []ModelData{
|
||||
{ID: "deepseek-v4-flash", Type: "model", DisplayName: "DeepSeek V4 Flash", CreatedAt: "2026-01-01T00:00:00Z"},
|
||||
{ID: "deepseek-v4-pro", Type: "model", DisplayName: "DeepSeek V4 Pro", CreatedAt: "2026-01-01T00:00:00Z"},
|
||||
}
|
||||
}
|
||||
response := map[string]interface{}{
|
||||
"data": models,
|
||||
"has_more": false,
|
||||
"first_id": models[0].ID,
|
||||
"last_id": models[len(models)-1].ID,
|
||||
}
|
||||
body, _ := json.Marshal(response)
|
||||
logger.Info("[ANTHROPIC] GET /v1/models -> 200 OK")
|
||||
return c.Type("json").Send(body)
|
||||
}
|
||||
|
||||
// isStreamRequest checks if the original body has "stream": true.
|
||||
func isStreamRequest(body []byte) bool {
|
||||
var m map[string]interface{}
|
||||
if json.Unmarshal(body, &m) != nil {
|
||||
return false
|
||||
}
|
||||
if v, ok := m["stream"]; ok {
|
||||
if b, ok := v.(bool); ok {
|
||||
return b
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// handleStreaming forwards a request to upstream with streaming enabled,
|
||||
// transforms OpenAI SSE chunks into Anthropic SSE events, and streams
|
||||
// the result back to the client via Fiber.
|
||||
func handleStreaming(
|
||||
c fiber.Ctx,
|
||||
targetURL string,
|
||||
headers http.Header,
|
||||
body []byte,
|
||||
cfg *config.Config,
|
||||
db *gorm.DB,
|
||||
originalBody []byte,
|
||||
start time.Time,
|
||||
) error {
|
||||
// Enable streaming in the request body
|
||||
var reqMap map[string]interface{}
|
||||
if err := json.Unmarshal(body, &reqMap); err != nil {
|
||||
return c.Status(fiber.StatusInternalServerError).SendString("invalid body")
|
||||
}
|
||||
reqMap["stream"] = true
|
||||
streamBody, _ := json.Marshal(reqMap)
|
||||
|
||||
// Direct HTTP request to upstream (bypass proxy.Forward for streaming)
|
||||
ctx, cancel := context.WithTimeout(c.Context(), time.Duration(cfg.RequestTimeoutSeconds)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", targetURL, bytes.NewReader(streamBody))
|
||||
if err != nil {
|
||||
return c.Status(fiber.StatusInternalServerError).SendString("build request failed")
|
||||
}
|
||||
for key, vals := range headers {
|
||||
for _, v := range vals {
|
||||
req.Header.Add(key, v)
|
||||
}
|
||||
}
|
||||
req.Header.Set("Accept", "text/event-stream")
|
||||
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
logger.Warn("[ANTHROPIC] Stream error: %v", err)
|
||||
return c.Status(fiber.StatusBadGateway).SendString("upstream error")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
logger.Warn("[ANTHROPIC] Stream upstream %d: %s", resp.StatusCode, string(body))
|
||||
return c.Status(resp.StatusCode).Send(body)
|
||||
}
|
||||
|
||||
// Stream the SSE response
|
||||
transformer := transform.NewStreamTransformer("", "")
|
||||
c.Set("Content-Type", "text/event-stream")
|
||||
c.Set("Cache-Control", "no-cache")
|
||||
c.Set("Connection", "keep-alive")
|
||||
|
||||
c.Response().SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
continue
|
||||
}
|
||||
chunk := []byte(strings.TrimPrefix(line, "data: "))
|
||||
event := transformer.TransformChunk(chunk)
|
||||
if event != "" {
|
||||
w.WriteString(event)
|
||||
w.Flush()
|
||||
}
|
||||
}
|
||||
// Ensure final events are sent
|
||||
final := transformer.TransformChunk([]byte("[DONE]"))
|
||||
if final != "" {
|
||||
w.WriteString(final)
|
||||
w.Flush()
|
||||
}
|
||||
})
|
||||
|
||||
// DB logging
|
||||
latency := time.Since(start).Milliseconds()
|
||||
if db != nil {
|
||||
logEntry := &models.RequestLog{
|
||||
Endpoint: c.OriginalURL(),
|
||||
Method: c.Method(),
|
||||
ClientIP: c.IP(),
|
||||
RequestBody: models.TruncateBody(string(originalBody)),
|
||||
ResponseStatus: 200,
|
||||
LatencyMs: latency,
|
||||
}
|
||||
go db.Create(logEntry)
|
||||
}
|
||||
logger.Info("└─ [ANTHROPIC] <<< 200 streaming (%dms)", latency)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user