BodySense 测试策略

BodySense 项目的完整测试策略:测试金字塔、单元测试、集成测试、E2E 测试、AI 输出测试、测试数据管理。

#type / concept #status / growing #tech / dev #tech / ai

[!info] related notes

BodySense 测试策略

测试金字塔

        ┌─────────────┐
        │   E2E 测试   │  ← 少量,验证关键流程
        │   (10%)      │
        ├─────────────┤
        │  集成测试     │  ← 中量,验证模块协作
        │   (20%)      │
        ├─────────────┤
        │  单元测试     │  ← 大量,验证逻辑正确
        │   (70%)      │
        └─────────────┘

1. Go 后端测试

单元测试

Service 层测试

// service/session_test.go
func TestCreateSession_ReusesExisting(t *testing.T) {
    // Arrange
    mockRepo := &mockSessionRepository{
        existingSession: &Session{
            ID:    uuid.New(),
            Phase: "collecting",
        },
    }
    svc := NewSessionService(mockRepo, nil, nil)
    userID := uuid.New()

    // Act
    session, err := svc.CreateSession(context.Background(), userID)

    // Assert
    assert.NoError(t, err)
    assert.NotNil(t, session)
    assert.Equal(t, "collecting", session.Phase)
    assert.Equal(t, 0, mockRepo.createCalled)  // 没有调用 Create
}

func TestCreateSession_CreatesNew(t *testing.T) {
    // Arrange
    mockRepo := &mockSessionRepository{
        existingSession: nil,
    }
    svc := NewSessionService(mockRepo, nil, nil)
    userID := uuid.New()

    // Act
    session, err := svc.CreateSession(context.Background(), userID)

    // Assert
    assert.NoError(t, err)
    assert.NotNil(t, session)
    assert.Equal(t, 1, mockRepo.createCalled)  // 调用了 Create
}

Mock 实现

// service/mock_test.go
type mockSessionRepository struct {
    existingSession *Session
    createCalled    int
    updateCalled    int
}

func (m *mockSessionRepository) GetLastInProgress(ctx context.Context, userID uuid.UUID) (*Session, error) {
    return m.existingSession, nil
}

func (m *mockSessionRepository) Create(ctx context.Context, session *Session) error {
    m.createCalled++
    return nil
}

func (m *mockSessionRepository) Update(ctx context.Context, session *Session) error {
    m.updateCalled++
    return nil
}

Table-Driven Tests

// service/phase_test.go
func TestIsInfoSufficient(t *testing.T) {
    tests := []struct {
        name     string
        symptoms []Symptom
        expected bool
    }{
        {
            name:     "no symptoms",
            symptoms: []Symptom{},
            expected: false,
        },
        {
            name: "symptom without details",
            symptoms: []Symptom{
                {BodyPart: "肩部"},
            },
            expected: false,
        },
        {
            name: "symptom with type",
            symptoms: []Symptom{
                {BodyPart: "肩部", SymptomType: "疼痛"},
            },
            expected: true,
        },
        {
            name: "symptom with duration",
            symptoms: []Symptom{
                {BodyPart: "肩部", Duration: "一周"},
            },
            expected: true,
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            result := isInfoSufficient(tt.symptoms)
            assert.Equal(t, tt.expected, result)
        })
    }
}

HTTP Handler 测试

// handler/auth_test.go
func TestLogin_Success(t *testing.T) {
    // Setup
    gin.SetMode(gin.TestMode)
    router := gin.New()
    router.POST("/api/v1/auth/login", handler.Login)

    // Arrange
    mockUser := &User{
        ID:           uuid.New(),
        Email:        "test@example.com",
        PasswordHash: hashPassword("password123"),
    }
    mockRepo := &mockUserRepository{user: mockUser}
    handler := NewAuthHandler(mockRepo, nil, jwtConfig)

    // Act
    body := `{"email":"test@example.com","password":"password123"}`
    req := httptest.NewRequest("POST", "/api/v1/auth/login", strings.NewReader(body))
    req.Header.Set("Content-Type", "application/json")
    w := httptest.NewRecorder()
    router.ServeHTTP(w, req)

    // Assert
    assert.Equal(t, 200, w.Code)
    var response map[string]interface{}
    json.Unmarshal(w.Body.Bytes(), &response)
    assert.NotEmpty(t, response["access_token"])
}

func TestLogin_InvalidCredentials(t *testing.T) {
    // Setup
    gin.SetMode(gin.TestMode)
    router := gin.New()
    router.POST("/api/v1/auth/login", handler.Login)

    // Arrange
    mockRepo := &mockUserRepository{user: nil}
    handler := NewAuthHandler(mockRepo, nil, jwtConfig)

    // Act
    body := `{"email":"test@example.com","password":"wrong"}`
    req := httptest.NewRequest("POST", "/api/v1/auth/login", strings.NewReader(body))
    req.Header.Set("Content-Type", "application/json")
    w := httptest.NewRecorder()
    router.ServeHTTP(w, req)

    // Assert
    assert.Equal(t, 401, w.Code)
}

集成测试

数据库集成测试

// repository/session_integration_test.go
//go:build integration

func TestSessionRepository_CreateAndGet(t *testing.T) {
    // Setup test database
    db := setupTestDB(t)
    defer cleanupTestDB(db)

    repo := NewSessionRepository(db)
    userID := uuid.New()

    // Create
    session := &Session{
        ID:     uuid.New(),
        UserID: userID,
        Status: "in_progress",
        Phase:  "collecting",
    }
    err := repo.Create(context.Background(), session)
    assert.NoError(t, err)

    // Get
    retrieved, err := repo.GetByID(context.Background(), session.ID)
    assert.NoError(t, err)
    assert.Equal(t, session.ID, retrieved.ID)
    assert.Equal(t, session.Status, retrieved.Status)
}

测试数据库设置

// testutil/db.go
func setupTestDB(t *testing.T) *gorm.DB {
    dsn := os.Getenv("TEST_DATABASE_URL")
    if dsn == "" {
        dsn = "postgres://localhost:5432/bodysense_test?sslmode=disable"
    }

    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        t.Fatalf("failed to connect to test database: %v", err)
    }

    // Run migrations
    err = db.AutoMigrate(&User{}, &Session{}, &Message{})
    if err != nil {
        t.Fatalf("failed to migrate: %v", err)
    }

    return db
}

func cleanupTestDB(db *gorm.DB) {
    db.Exec("TRUNCATE TABLE users, sessions, messages CASCADE")
}

API 集成测试

// api/auth_integration_test.go
//go:build integration

func TestAuthFlow_RegisterLoginRefresh(t *testing.T) {
    app := setupTestApp(t)
    defer cleanupTestApp(app)

    // Register
    registerResp := makeRequest(t, app, "POST", "/api/v1/auth/register", map[string]string{
        "email":    "test@example.com",
        "password": "Password123",
        "name":     "Test User",
    })
    assert.Equal(t, 201, registerResp.StatusCode)
    var registerData map[string]interface{}
    json.NewDecoder(registerResp.Body).Decode(&registerData)
    accessToken := registerData["access_token"].(string)

    // Login
    loginResp := makeRequest(t, app, "POST", "/api/v1/auth/login", map[string]string{
        "email":    "test@example.com",
        "password": "Password123",
    })
    assert.Equal(t, 200, loginResp.StatusCode)

    // Refresh
    refreshResp := makeRequestWithAuth(t, app, "POST", "/api/v1/auth/refresh", nil, accessToken)
    assert.Equal(t, 200, refreshResp.StatusCode)
}

2. Python AI 服务测试

单元测试

# tests/test_consultation_agent.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.agent import ConsultationAgent

@pytest.fixture
def mock_llm():
    return AsyncMock()

@pytest.fixture
def mock_rag():
    return AsyncMock()

@pytest.fixture
def agent(mock_llm, mock_rag):
    return ConsultationAgent(llm=mock_llm, rag=mock_rag)

@pytest.mark.asyncio
async def test_classify_intent_analysis(agent):
    # Arrange
    message = "帮我分析一下肩膀的问题"

    # Act
    intent = await agent.classify_intent(message)

    # Assert
    assert intent == "request_analysis"

@pytest.mark.asyncio
async def test_classify_intent_treatment(agent):
    # Arrange
    message = "给我一些改善的动作"

    # Act
    intent = await agent.classify_intent(message)

    # Assert
    assert intent == "request_treatment"

Table-Driven Tests (pytest)

# tests/test_intent.py
import pytest
from app.intent import classify_intent

@pytest.mark.parametrize("message,expected", [
    ("帮我分析一下", "request_analysis"),
    ("帮我诊断", "request_analysis"),
    ("看看我的情况", "request_analysis"),
    ("确认诊断", "confirm_diagnosis"),
    ("同意这个诊断", "confirm_diagnosis"),
    ("给我改善方案", "request_treatment"),
    ("怎么矫正", "request_treatment"),
    ("练什么动作", "request_treatment"),
    ("其他问题", "general_question"),
])
def test_classify_intent(message, expected):
    result = classify_intent(message)
    assert result == expected

Function Calling 测试

# tests/test_function_calling.py
@pytest.mark.asyncio
async def test_streaming_tool_call_accumulation():
    # Arrange
    chunks = [
        {"choices": [{"delta": {"tool_calls": [{"index": 0, "id": "call_123", "function": {"name": "extract_symptom_info"}}]}}]},
        {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": "{\"body_part"}}]}}]},
        {"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": "\":\"肩部\"}"}}]}}]},
        {"choices": [{"delta": {}, "finish_reason": "stop"}]},
    ]

    # Act
    result = []
    async for chunk in mock_stream(chunks):
        result.append(chunk)

    # Assert
    assert len(result) == 1  # 只有完成时才返回 tool_calls
    assert result[0].tool_calls[0].name == "extract_symptom_info"
    assert result[0].tool_calls[0].arguments == {"body_part": "肩部"}

RAG 测试

# tests/test_rag.py
@pytest.mark.asyncio
async def test_rerank_with_intent():
    # Arrange
    results = [
        SearchResult(title="颈椎前伸定义", unit_type="definition", similarity=0.85),
        SearchResult(title="颈椎前伸矫正动作", unit_type="exercise", similarity=0.78),
    ]
    message = "颈椎前伸怎么练"

    # Act
    reranked = rerank(results, message)

    # Assert
    assert reranked[0].unit_type == "exercise"  # exercise 排第一
    assert reranked[0].rerank_score > reranked[1].rerank_score

3. 前端测试

组件测试

// components/__tests__/ProtectedRoute.test.tsx
import { render, screen } from '@testing-library/react';
import { ProtectedRoute } from '../ProtectedRoute';
import { useAuthStore } from '@/stores/auth';

// Mock store
jest.mock('@/stores/auth');

describe('ProtectedRoute', () => {
  it('renders children when authenticated', () => {
    (useAuthStore as jest.Mock).mockReturnValue({
      isAuthenticated: true,
    });

    render(
      <ProtectedRoute>
        <div>Protected Content</div>
      </ProtectedRoute>
    );

    expect(screen.getByText('Protected Content')).toBeInTheDocument();
  });

  it('redirects to login when not authenticated', () => {
    (useAuthStore as jest.Mock).mockReturnValue({
      isAuthenticated: false,
    });

    render(
      <ProtectedRoute>
        <div>Protected Content</div>
      </ProtectedRoute>
    );

    expect(screen.queryByText('Protected Content')).not.toBeInTheDocument();
  });
});

Hook 测试

// hooks/__tests__/useChatSSE.test.tsx
import { renderHook, act } from '@testing-library/react';
import { useChatSSE } from '../useChatSSE';

// Mock fetch
global.fetch = jest.fn();

describe('useChatSSE', () => {
  it('handles text events', async () => {
    const onText = jest.fn();
    const { result } = renderHook(() =>
      useChatSSE({ onText })
    );

    // Mock SSE response
    (global.fetch as jest.Mock).mockResolvedValue({
      ok: true,
      body: {
        getReader: () => ({
          read: jest.fn()
            .mockResolvedValueOnce({
              done: false,
              value: new TextEncoder().encode('event: text\ndata: {"content": "你好"}\n\n'),
            })
            .mockResolvedValueOnce({
              done: true,
            }),
        }),
      },
    });

    await act(async () => {
      await result.current.sendMessage('session-1', '你好');
    });

    expect(onText).toHaveBeenCalledWith('你好');
  });
});

Store 测试

// stores/__tests__/auth.test.ts
import { useAuthStore } from '../auth';

describe('auth store', () => {
  beforeEach(() => {
    useAuthStore.setState({
      accessToken: null,
      refreshToken: null,
      isAuthenticated: false,
    });
  });

  it('sets tokens on login', async () => {
    global.fetch = jest.fn().mockResolvedValue({
      ok: true,
      json: () => Promise.resolve({
        access_token: 'test-token',
        refresh_token: 'test-refresh',
      }),
    });

    await useAuthStore.getState().login('test@example.com', 'password');

    const state = useAuthStore.getState();
    expect(state.accessToken).toBe('test-token');
    expect(state.isAuthenticated).toBe(true);
  });

  it('clears tokens on logout', () => {
    useAuthStore.setState({
      accessToken: 'test-token',
      isAuthenticated: true,
    });

    useAuthStore.getState().logout();

    const state = useAuthStore.getState();
    expect(state.accessToken).toBeNull();
    expect(state.isAuthenticated).toBe(false);
  });
});

4. AI 输出测试

Faithfulness 测试

# tests/test_faithfulness.py
def test_faithfulness_checker_detects_ungrounded():
    # Arrange
    treatment = """
    建议进行以下训练:
    1. 颈部后缩 - 3组,每组10次
    2. 颈部旋转拉伸 - 保持30秒
    3. 臀桥 - 3组,每组15次
    """
    knowledge = [
        "颈部后缩:收下巴,保持5秒",
        "臀桥:仰卧,抬臀",
    ]

    # Act
    result = check_faithfulness(treatment, knowledge)

    # Assert
    assert result.faithful == False
    assert "颈部旋转拉伸" in result.ungrounded_exercises

Red Flag 测试

# tests/test_red_flag.py
@pytest.mark.parametrize("message,should_detect", [
    ("肩膀有点疼", False),
    ("剧烈疼痛,无法忍受", True),
    ("手指麻木无力", True),
    ("越来越严重", True),
    ("轻微不适", False),
])
def test_red_flag_detection(message, should_detect):
    result = detect_red_flags(message)
    assert result.has_red_flags == should_detect

5. E2E 测试

Playwright 测试

// e2e/auth.spec.ts
import { test, expect } from '@playwright/test';

test.describe('Authentication', () => {
  test('register and login flow', async ({ page }) => {
    // Register
    await page.goto('/register');
    await page.fill('[name="email"]', 'test@example.com');
    await page.fill('[name="password"]', 'Password123');
    await page.fill('[name="name"]', 'Test User');
    await page.click('button[type="submit"]');

    // Should redirect to dashboard
    await expect(page).toHaveURL('/dashboard');

    // Logout
    await page.click('[data-testid="logout-button"]');
    await expect(page).toHaveURL('/login');

    // Login
    await page.fill('[name="email"]', 'test@example.com');
    await page.fill('[name="password"]', 'Password123');
    await page.click('button[type="submit"]');

    await expect(page).toHaveURL('/dashboard');
  });
});

AI 对话 E2E 测试

// e2e/consultation.spec.ts
test.describe('AI Consultation', () => {
  test('complete consultation flow', async ({ page }) => {
    await login(page);

    // Start consultation
    await page.goto('/consultation');
    await page.click('button:has-text("开始咨询")');

    // Send message
    await page.fill('[data-testid="chat-input"]', '我肩膀疼');
    await page.click('button[type="submit"]');

    // Wait for AI response
    await expect(page.locator('[data-testid="ai-message"]')).toBeVisible({
      timeout: 30000,
    });

    // Check symptom extraction
    await expect(page.locator('[data-testid="symptom-panel"]')).toContainText('肩部');

    // Check red flag (should not trigger)
    await expect(page.locator('[data-testid="red-flag-alert"]')).not.toBeVisible();
  });
});

6. 测试数据管理

Factory 模式

// testutil/factory.go
type UserFactory struct {
    email    string
    password string
    name     string
}

func NewUserFactory() *UserFactory {
    return &UserFactory{
        email:    fmt.Sprintf("user-%s@example.com", uuid.New().String()[:8]),
        password: "Password123",
        name:     "Test User",
    }
}

func (f *UserFactory) WithEmail(email string) *UserFactory {
    f.email = email
    return f
}

func (f *UserFactory) Build() *User {
    hash, _ := bcrypt.GenerateFromPassword([]byte(f.password), 12)
    return &User{
        ID:           uuid.New(),
        Email:        f.email,
        PasswordHash: string(hash),
        Name:         f.name,
    }
}

func (f *UserFactory) Create(t *testing.T, repo UserRepository) *User {
    user := f.Build()
    err := repo.Create(context.Background(), user)
    assert.NoError(t, err)
    return user
}

Fixture 加载

// testutil/fixtures.go
func LoadTestFixture(t *testing.T, db *gorm.DB, name string) {
    data, err := os.ReadFile(fmt.Sprintf("testdata/%s.json", name))
    if err != nil {
        t.Fatalf("failed to load fixture %s: %v", name, err)
    }

    var fixtures map[string][]map[string]interface{}
    json.Unmarshal(data, &fixtures)

    for table, records := range fixtures {
        for _, record := range records {
            db.Table(table).Create(record)
        }
    }
}

7. 测试配置

测试环境变量

# .env.test
DATABASE_URL=postgres://localhost:5432/bodysense_test?sslmode=disable
REDIS_URL=redis://localhost:6379/1
JWT_SECRET=test-secret-key
LLM_API_KEY=test-api-key
LLM_BASE_URL=http://localhost:8100/v1

测试数据库初始化

# scripts/test-setup.sh
#!/bin/bash

# 创建测试数据库
createdb bodysense_test

# 运行迁移
go run cmd/migrate/main.go up

# 加载测试数据
go run cmd/seed/main.go

8. CI 中的测试

# .github/workflows/test.yml
jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:16
        env:
          POSTGRES_DB: bodysense_test
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
        ports:
          - 5432:5432

    steps:
      - uses: actions/checkout@v4

      - name: Run tests
        run: |
          go test ./... -v -coverprofile=coverage.out
          go tool cover -func=coverage.out

9. 测试覆盖率

覆盖率目标

目标说明
Service80%+核心业务逻辑
Handler60%+HTTP 协议转换
Repository50%+简单 CRUD
AI70%+关键算法

覆盖率报告

# Go
go test ./... -coverprofile=coverage.out
go tool cover -html=coverage.out

# Python
pytest --cov=app --cov-report=html

# Frontend
pnpm test -- --coverage

常见面试问题

Q: 你的测试策略是什么?

A:

  1. 测试金字塔:70% 单元测试,20% 集成测试,10% E2E 测试
  2. 分层测试:Service 层用 Mock,Repository 层用测试数据库
  3. AI 输出测试:Faithfulness 校验、Red Flag 检测、意图分类
  4. CI 集成:每次提交自动运行测试

Q: 怎么测试 AI 输出?

A:

  1. 确定性测试:Red Flag 关键词检测、意图分类
  2. 质量测试:Faithfulness 校验(治疗动作是否在知识库中)
  3. 回归测试:固定输入,验证输出变化
  4. 人工审核:抽样检查 AI 回复质量

Q: 怎么处理测试数据?

A:

  1. Factory 模式:生成测试数据
  2. Fixture 加载:从 JSON 文件加载
  3. 测试隔离:每个测试清理数据
  4. 测试数据库:独立数据库,不影响开发

常见错误

测试依赖外部服务

// ❌ 测试依赖真实 API
func TestLLMCall(t *testing.T) {
    response, err := callRealLLM("test prompt")
    // 不稳定,依赖网络和 API
}

// ✅ 使用 Mock
func TestLLMCall(t *testing.T) {
    mockLLM := &MockLLM{
        Response: "test response",
    }
    response, err := mockLLM.Call("test prompt")
    // 稳定,不依赖外部
}

测试不清理数据

// ❌ 测试后不清理,影响其他测试
func TestCreateUser(t *testing.T) {
    user := CreateUser(...)
    // 不清理
}

// ✅ 测试后清理
func TestCreateUser(t *testing.T) {
    user := CreateUser(...)
    defer DeleteUser(user.ID)
}

测试覆盖不足

// ❌ 只测试成功路径
func TestLogin_Success(t *testing.T) {
    // ...
}

// ✅ 测试各种情况
func TestLogin(t *testing.T) {
    t.Run("success", func(t *testing.T) { ... })
    t.Run("invalid credentials", func(t *testing.T) { ... })
    t.Run("account locked", func(t *testing.T) { ... })
    t.Run("rate limited", func(t *testing.T) { ... })
}
创建于 2026/6/25 更新于 2026/6/25