BodySense 测试策略
BodySense 项目的完整测试策略:测试金字塔、单元测试、集成测试、E2E 测试、AI 输出测试、测试数据管理。
#type / concept
#status / growing
#tech / dev
#tech / ai
[!info] related notes
- 知识地图: BodySense 项目 MOC
- 分层: Handler-Service-Repository 分层
- CI: GitHub Actions CI
BodySense 测试策略
测试金字塔
┌─────────────┐
│ E2E 测试 │ ← 少量,验证关键流程
│ (10%) │
├─────────────┤
│ 集成测试 │ ← 中量,验证模块协作
│ (20%) │
├─────────────┤
│ 单元测试 │ ← 大量,验证逻辑正确
│ (70%) │
└─────────────┘
1. Go 后端测试
单元测试
Service 层测试
// service/session_test.go
func TestCreateSession_ReusesExisting(t *testing.T) {
// Arrange
mockRepo := &mockSessionRepository{
existingSession: &Session{
ID: uuid.New(),
Phase: "collecting",
},
}
svc := NewSessionService(mockRepo, nil, nil)
userID := uuid.New()
// Act
session, err := svc.CreateSession(context.Background(), userID)
// Assert
assert.NoError(t, err)
assert.NotNil(t, session)
assert.Equal(t, "collecting", session.Phase)
assert.Equal(t, 0, mockRepo.createCalled) // 没有调用 Create
}
func TestCreateSession_CreatesNew(t *testing.T) {
// Arrange
mockRepo := &mockSessionRepository{
existingSession: nil,
}
svc := NewSessionService(mockRepo, nil, nil)
userID := uuid.New()
// Act
session, err := svc.CreateSession(context.Background(), userID)
// Assert
assert.NoError(t, err)
assert.NotNil(t, session)
assert.Equal(t, 1, mockRepo.createCalled) // 调用了 Create
}
Mock 实现
// service/mock_test.go
type mockSessionRepository struct {
existingSession *Session
createCalled int
updateCalled int
}
func (m *mockSessionRepository) GetLastInProgress(ctx context.Context, userID uuid.UUID) (*Session, error) {
return m.existingSession, nil
}
func (m *mockSessionRepository) Create(ctx context.Context, session *Session) error {
m.createCalled++
return nil
}
func (m *mockSessionRepository) Update(ctx context.Context, session *Session) error {
m.updateCalled++
return nil
}
Table-Driven Tests
// service/phase_test.go
func TestIsInfoSufficient(t *testing.T) {
tests := []struct {
name string
symptoms []Symptom
expected bool
}{
{
name: "no symptoms",
symptoms: []Symptom{},
expected: false,
},
{
name: "symptom without details",
symptoms: []Symptom{
{BodyPart: "肩部"},
},
expected: false,
},
{
name: "symptom with type",
symptoms: []Symptom{
{BodyPart: "肩部", SymptomType: "疼痛"},
},
expected: true,
},
{
name: "symptom with duration",
symptoms: []Symptom{
{BodyPart: "肩部", Duration: "一周"},
},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isInfoSufficient(tt.symptoms)
assert.Equal(t, tt.expected, result)
})
}
}
HTTP Handler 测试
// handler/auth_test.go
func TestLogin_Success(t *testing.T) {
// Setup
gin.SetMode(gin.TestMode)
router := gin.New()
router.POST("/api/v1/auth/login", handler.Login)
// Arrange
mockUser := &User{
ID: uuid.New(),
Email: "test@example.com",
PasswordHash: hashPassword("password123"),
}
mockRepo := &mockUserRepository{user: mockUser}
handler := NewAuthHandler(mockRepo, nil, jwtConfig)
// Act
body := `{"email":"test@example.com","password":"password123"}`
req := httptest.NewRequest("POST", "/api/v1/auth/login", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
// Assert
assert.Equal(t, 200, w.Code)
var response map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &response)
assert.NotEmpty(t, response["access_token"])
}
func TestLogin_InvalidCredentials(t *testing.T) {
// Setup
gin.SetMode(gin.TestMode)
router := gin.New()
router.POST("/api/v1/auth/login", handler.Login)
// Arrange
mockRepo := &mockUserRepository{user: nil}
handler := NewAuthHandler(mockRepo, nil, jwtConfig)
// Act
body := `{"email":"test@example.com","password":"wrong"}`
req := httptest.NewRequest("POST", "/api/v1/auth/login", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
router.ServeHTTP(w, req)
// Assert
assert.Equal(t, 401, w.Code)
}
集成测试
数据库集成测试
// repository/session_integration_test.go
//go:build integration
func TestSessionRepository_CreateAndGet(t *testing.T) {
// Setup test database
db := setupTestDB(t)
defer cleanupTestDB(db)
repo := NewSessionRepository(db)
userID := uuid.New()
// Create
session := &Session{
ID: uuid.New(),
UserID: userID,
Status: "in_progress",
Phase: "collecting",
}
err := repo.Create(context.Background(), session)
assert.NoError(t, err)
// Get
retrieved, err := repo.GetByID(context.Background(), session.ID)
assert.NoError(t, err)
assert.Equal(t, session.ID, retrieved.ID)
assert.Equal(t, session.Status, retrieved.Status)
}
测试数据库设置
// testutil/db.go
func setupTestDB(t *testing.T) *gorm.DB {
dsn := os.Getenv("TEST_DATABASE_URL")
if dsn == "" {
dsn = "postgres://localhost:5432/bodysense_test?sslmode=disable"
}
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
t.Fatalf("failed to connect to test database: %v", err)
}
// Run migrations
err = db.AutoMigrate(&User{}, &Session{}, &Message{})
if err != nil {
t.Fatalf("failed to migrate: %v", err)
}
return db
}
func cleanupTestDB(db *gorm.DB) {
db.Exec("TRUNCATE TABLE users, sessions, messages CASCADE")
}
API 集成测试
// api/auth_integration_test.go
//go:build integration
func TestAuthFlow_RegisterLoginRefresh(t *testing.T) {
app := setupTestApp(t)
defer cleanupTestApp(app)
// Register
registerResp := makeRequest(t, app, "POST", "/api/v1/auth/register", map[string]string{
"email": "test@example.com",
"password": "Password123",
"name": "Test User",
})
assert.Equal(t, 201, registerResp.StatusCode)
var registerData map[string]interface{}
json.NewDecoder(registerResp.Body).Decode(®isterData)
accessToken := registerData["access_token"].(string)
// Login
loginResp := makeRequest(t, app, "POST", "/api/v1/auth/login", map[string]string{
"email": "test@example.com",
"password": "Password123",
})
assert.Equal(t, 200, loginResp.StatusCode)
// Refresh
refreshResp := makeRequestWithAuth(t, app, "POST", "/api/v1/auth/refresh", nil, accessToken)
assert.Equal(t, 200, refreshResp.StatusCode)
}
2. Python AI 服务测试
单元测试
# tests/test_consultation_agent.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.agent import ConsultationAgent
@pytest.fixture
def mock_llm():
return AsyncMock()
@pytest.fixture
def mock_rag():
return AsyncMock()
@pytest.fixture
def agent(mock_llm, mock_rag):
return ConsultationAgent(llm=mock_llm, rag=mock_rag)
@pytest.mark.asyncio
async def test_classify_intent_analysis(agent):
# Arrange
message = "帮我分析一下肩膀的问题"
# Act
intent = await agent.classify_intent(message)
# Assert
assert intent == "request_analysis"
@pytest.mark.asyncio
async def test_classify_intent_treatment(agent):
# Arrange
message = "给我一些改善的动作"
# Act
intent = await agent.classify_intent(message)
# Assert
assert intent == "request_treatment"
Table-Driven Tests (pytest)
# tests/test_intent.py
import pytest
from app.intent import classify_intent
@pytest.mark.parametrize("message,expected", [
("帮我分析一下", "request_analysis"),
("帮我诊断", "request_analysis"),
("看看我的情况", "request_analysis"),
("确认诊断", "confirm_diagnosis"),
("同意这个诊断", "confirm_diagnosis"),
("给我改善方案", "request_treatment"),
("怎么矫正", "request_treatment"),
("练什么动作", "request_treatment"),
("其他问题", "general_question"),
])
def test_classify_intent(message, expected):
result = classify_intent(message)
assert result == expected
Function Calling 测试
# tests/test_function_calling.py
@pytest.mark.asyncio
async def test_streaming_tool_call_accumulation():
# Arrange
chunks = [
{"choices": [{"delta": {"tool_calls": [{"index": 0, "id": "call_123", "function": {"name": "extract_symptom_info"}}]}}]},
{"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": "{\"body_part"}}]}}]},
{"choices": [{"delta": {"tool_calls": [{"index": 0, "function": {"arguments": "\":\"肩部\"}"}}]}}]},
{"choices": [{"delta": {}, "finish_reason": "stop"}]},
]
# Act
result = []
async for chunk in mock_stream(chunks):
result.append(chunk)
# Assert
assert len(result) == 1 # 只有完成时才返回 tool_calls
assert result[0].tool_calls[0].name == "extract_symptom_info"
assert result[0].tool_calls[0].arguments == {"body_part": "肩部"}
RAG 测试
# tests/test_rag.py
@pytest.mark.asyncio
async def test_rerank_with_intent():
# Arrange
results = [
SearchResult(title="颈椎前伸定义", unit_type="definition", similarity=0.85),
SearchResult(title="颈椎前伸矫正动作", unit_type="exercise", similarity=0.78),
]
message = "颈椎前伸怎么练"
# Act
reranked = rerank(results, message)
# Assert
assert reranked[0].unit_type == "exercise" # exercise 排第一
assert reranked[0].rerank_score > reranked[1].rerank_score
3. 前端测试
组件测试
// components/__tests__/ProtectedRoute.test.tsx
import { render, screen } from '@testing-library/react';
import { ProtectedRoute } from '../ProtectedRoute';
import { useAuthStore } from '@/stores/auth';
// Mock store
jest.mock('@/stores/auth');
describe('ProtectedRoute', () => {
it('renders children when authenticated', () => {
(useAuthStore as jest.Mock).mockReturnValue({
isAuthenticated: true,
});
render(
<ProtectedRoute>
<div>Protected Content</div>
</ProtectedRoute>
);
expect(screen.getByText('Protected Content')).toBeInTheDocument();
});
it('redirects to login when not authenticated', () => {
(useAuthStore as jest.Mock).mockReturnValue({
isAuthenticated: false,
});
render(
<ProtectedRoute>
<div>Protected Content</div>
</ProtectedRoute>
);
expect(screen.queryByText('Protected Content')).not.toBeInTheDocument();
});
});
Hook 测试
// hooks/__tests__/useChatSSE.test.tsx
import { renderHook, act } from '@testing-library/react';
import { useChatSSE } from '../useChatSSE';
// Mock fetch
global.fetch = jest.fn();
describe('useChatSSE', () => {
it('handles text events', async () => {
const onText = jest.fn();
const { result } = renderHook(() =>
useChatSSE({ onText })
);
// Mock SSE response
(global.fetch as jest.Mock).mockResolvedValue({
ok: true,
body: {
getReader: () => ({
read: jest.fn()
.mockResolvedValueOnce({
done: false,
value: new TextEncoder().encode('event: text\ndata: {"content": "你好"}\n\n'),
})
.mockResolvedValueOnce({
done: true,
}),
}),
},
});
await act(async () => {
await result.current.sendMessage('session-1', '你好');
});
expect(onText).toHaveBeenCalledWith('你好');
});
});
Store 测试
// stores/__tests__/auth.test.ts
import { useAuthStore } from '../auth';
describe('auth store', () => {
beforeEach(() => {
useAuthStore.setState({
accessToken: null,
refreshToken: null,
isAuthenticated: false,
});
});
it('sets tokens on login', async () => {
global.fetch = jest.fn().mockResolvedValue({
ok: true,
json: () => Promise.resolve({
access_token: 'test-token',
refresh_token: 'test-refresh',
}),
});
await useAuthStore.getState().login('test@example.com', 'password');
const state = useAuthStore.getState();
expect(state.accessToken).toBe('test-token');
expect(state.isAuthenticated).toBe(true);
});
it('clears tokens on logout', () => {
useAuthStore.setState({
accessToken: 'test-token',
isAuthenticated: true,
});
useAuthStore.getState().logout();
const state = useAuthStore.getState();
expect(state.accessToken).toBeNull();
expect(state.isAuthenticated).toBe(false);
});
});
4. AI 输出测试
Faithfulness 测试
# tests/test_faithfulness.py
def test_faithfulness_checker_detects_ungrounded():
# Arrange
treatment = """
建议进行以下训练:
1. 颈部后缩 - 3组,每组10次
2. 颈部旋转拉伸 - 保持30秒
3. 臀桥 - 3组,每组15次
"""
knowledge = [
"颈部后缩:收下巴,保持5秒",
"臀桥:仰卧,抬臀",
]
# Act
result = check_faithfulness(treatment, knowledge)
# Assert
assert result.faithful == False
assert "颈部旋转拉伸" in result.ungrounded_exercises
Red Flag 测试
# tests/test_red_flag.py
@pytest.mark.parametrize("message,should_detect", [
("肩膀有点疼", False),
("剧烈疼痛,无法忍受", True),
("手指麻木无力", True),
("越来越严重", True),
("轻微不适", False),
])
def test_red_flag_detection(message, should_detect):
result = detect_red_flags(message)
assert result.has_red_flags == should_detect
5. E2E 测试
Playwright 测试
// e2e/auth.spec.ts
import { test, expect } from '@playwright/test';
test.describe('Authentication', () => {
test('register and login flow', async ({ page }) => {
// Register
await page.goto('/register');
await page.fill('[name="email"]', 'test@example.com');
await page.fill('[name="password"]', 'Password123');
await page.fill('[name="name"]', 'Test User');
await page.click('button[type="submit"]');
// Should redirect to dashboard
await expect(page).toHaveURL('/dashboard');
// Logout
await page.click('[data-testid="logout-button"]');
await expect(page).toHaveURL('/login');
// Login
await page.fill('[name="email"]', 'test@example.com');
await page.fill('[name="password"]', 'Password123');
await page.click('button[type="submit"]');
await expect(page).toHaveURL('/dashboard');
});
});
AI 对话 E2E 测试
// e2e/consultation.spec.ts
test.describe('AI Consultation', () => {
test('complete consultation flow', async ({ page }) => {
await login(page);
// Start consultation
await page.goto('/consultation');
await page.click('button:has-text("开始咨询")');
// Send message
await page.fill('[data-testid="chat-input"]', '我肩膀疼');
await page.click('button[type="submit"]');
// Wait for AI response
await expect(page.locator('[data-testid="ai-message"]')).toBeVisible({
timeout: 30000,
});
// Check symptom extraction
await expect(page.locator('[data-testid="symptom-panel"]')).toContainText('肩部');
// Check red flag (should not trigger)
await expect(page.locator('[data-testid="red-flag-alert"]')).not.toBeVisible();
});
});
6. 测试数据管理
Factory 模式
// testutil/factory.go
type UserFactory struct {
email string
password string
name string
}
func NewUserFactory() *UserFactory {
return &UserFactory{
email: fmt.Sprintf("user-%s@example.com", uuid.New().String()[:8]),
password: "Password123",
name: "Test User",
}
}
func (f *UserFactory) WithEmail(email string) *UserFactory {
f.email = email
return f
}
func (f *UserFactory) Build() *User {
hash, _ := bcrypt.GenerateFromPassword([]byte(f.password), 12)
return &User{
ID: uuid.New(),
Email: f.email,
PasswordHash: string(hash),
Name: f.name,
}
}
func (f *UserFactory) Create(t *testing.T, repo UserRepository) *User {
user := f.Build()
err := repo.Create(context.Background(), user)
assert.NoError(t, err)
return user
}
Fixture 加载
// testutil/fixtures.go
func LoadTestFixture(t *testing.T, db *gorm.DB, name string) {
data, err := os.ReadFile(fmt.Sprintf("testdata/%s.json", name))
if err != nil {
t.Fatalf("failed to load fixture %s: %v", name, err)
}
var fixtures map[string][]map[string]interface{}
json.Unmarshal(data, &fixtures)
for table, records := range fixtures {
for _, record := range records {
db.Table(table).Create(record)
}
}
}
7. 测试配置
测试环境变量
# .env.test
DATABASE_URL=postgres://localhost:5432/bodysense_test?sslmode=disable
REDIS_URL=redis://localhost:6379/1
JWT_SECRET=test-secret-key
LLM_API_KEY=test-api-key
LLM_BASE_URL=http://localhost:8100/v1
测试数据库初始化
# scripts/test-setup.sh
#!/bin/bash
# 创建测试数据库
createdb bodysense_test
# 运行迁移
go run cmd/migrate/main.go up
# 加载测试数据
go run cmd/seed/main.go
8. CI 中的测试
# .github/workflows/test.yml
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16
env:
POSTGRES_DB: bodysense_test
POSTGRES_USER: test
POSTGRES_PASSWORD: test
ports:
- 5432:5432
steps:
- uses: actions/checkout@v4
- name: Run tests
run: |
go test ./... -v -coverprofile=coverage.out
go tool cover -func=coverage.out
9. 测试覆盖率
覆盖率目标
| 层 | 目标 | 说明 |
|---|---|---|
| Service | 80%+ | 核心业务逻辑 |
| Handler | 60%+ | HTTP 协议转换 |
| Repository | 50%+ | 简单 CRUD |
| AI | 70%+ | 关键算法 |
覆盖率报告
# Go
go test ./... -coverprofile=coverage.out
go tool cover -html=coverage.out
# Python
pytest --cov=app --cov-report=html
# Frontend
pnpm test -- --coverage
常见面试问题
Q: 你的测试策略是什么?
A:
- 测试金字塔:70% 单元测试,20% 集成测试,10% E2E 测试
- 分层测试:Service 层用 Mock,Repository 层用测试数据库
- AI 输出测试:Faithfulness 校验、Red Flag 检测、意图分类
- CI 集成:每次提交自动运行测试
Q: 怎么测试 AI 输出?
A:
- 确定性测试:Red Flag 关键词检测、意图分类
- 质量测试:Faithfulness 校验(治疗动作是否在知识库中)
- 回归测试:固定输入,验证输出变化
- 人工审核:抽样检查 AI 回复质量
Q: 怎么处理测试数据?
A:
- Factory 模式:生成测试数据
- Fixture 加载:从 JSON 文件加载
- 测试隔离:每个测试清理数据
- 测试数据库:独立数据库,不影响开发
常见错误
测试依赖外部服务
// ❌ 测试依赖真实 API
func TestLLMCall(t *testing.T) {
response, err := callRealLLM("test prompt")
// 不稳定,依赖网络和 API
}
// ✅ 使用 Mock
func TestLLMCall(t *testing.T) {
mockLLM := &MockLLM{
Response: "test response",
}
response, err := mockLLM.Call("test prompt")
// 稳定,不依赖外部
}
测试不清理数据
// ❌ 测试后不清理,影响其他测试
func TestCreateUser(t *testing.T) {
user := CreateUser(...)
// 不清理
}
// ✅ 测试后清理
func TestCreateUser(t *testing.T) {
user := CreateUser(...)
defer DeleteUser(user.ID)
}
测试覆盖不足
// ❌ 只测试成功路径
func TestLogin_Success(t *testing.T) {
// ...
}
// ✅ 测试各种情况
func TestLogin(t *testing.T) {
t.Run("success", func(t *testing.T) { ... })
t.Run("invalid credentials", func(t *testing.T) { ... })
t.Run("account locked", func(t *testing.T) { ... })
t.Run("rate limited", func(t *testing.T) { ... })
}