一、引言:为什么你需要关心AI引用率
2024年以来,以ChatGPT、Perplexity、Kimi、豆包、DeepSeek等为代表的AI搜索引擎和对话助手正在重塑用户的搜索行为。越来越多的用户不再打开传统搜索引擎,而是直接在AI助手面前提问。这意味着一个全新的SEO战场已经开辟——GEO(Generative Engine Optimization,生成式引擎优化)。
在GEO体系中,最核心的指标莫过于AI引用率:你的品牌、内容或产品,在各大AI引擎的回答中被提及和引用的频率究竟有多高?你是否知道你的竞品在AI回答中出现的频次是你的几倍?你是否能第一时间发现AI引用了你网站的哪个页面、引用了什么内容、引用的位次和上下文是什么?
然而,目前市面上成熟的AI引用监控工具寥寥无几,且大多以付费SaaS形式存在。对于技术团队来说,完全可以用Python搭建一套自己的AI引用率监控与诊断系统。本文将手把手教你从零构建这套系统,附带完整可运行代码,覆盖关键词库构建、多源数据采集、引用判定、位次抓取、数据存储、可视化看板和异常告警等全流程。
本文面向读者:技术开发者、SEO工程师、数据分析师。你需要具备Python基础、基本的数据库操作能力和一点点API调试经验。
二、系统架构概览
在动手写代码之前,我们先来理解整个系统的架构。系统包括以下核心组件:
- 关键词库:覆盖你所在行业的品牌词、产品词、通用词、长尾问答词
- 多源采集层:通过API或自动化方式,向ChatGPT、Perplexity、Claude、Kimi、文心一言、通义千问、豆包、DeepSeek等引擎发送提问并抓取回答
- 引用判定层:对AI回答进行语义匹配与正则匹配,判定你的品牌或内容是否被引用
- 引用解析层:提取引用的位次(第几个被提及)、引用上下文(前后文本)、引用来源URL
- 数据存储层:使用PostgreSQL或MySQL持久化存储监控数据
- 可视化看板:基于Streamlit和Plotly构建交互式仪表盘
- 告警层:引用率异常波动时通过企业微信或飞书Webhook推送告警
三、环境准备
3.1 Python环境
首先确保你安装了Python 3.10或以上版本。推荐使用conda或venv创建虚拟环境:
conda create -n geo_monitor python=3.11 conda activate geo_monitor
3.2 安装依赖库
本项目需要的核心依赖如下。我们用pip一键安装:
pip install openai anthropic requests beautifulsoup4 sqlalchemy psycopg2-binary streamlit plotly pandas numpy schedule python-dotenv dashscope zhipuai volcengine
各库用途说明:
openai:调用ChatGPT API,兼用于Perplexity、DeepSeek等兼容OpenAI格式的接口anthropic:调用Claude APIdashscope:阿里云灵积模型服务,用于调用通义千问zhipuai:智谱AI,用于调用ChatGLM系列模型volcengine:火山引擎,用于调用豆包系列模型sqlalchemy与psycopg2-binary:数据库ORM与PostgreSQL驱动,如需MySQL则改用pymysqlstreamlit与plotly:构建数据可视化看板schedule:定时任务调度python-dotenv:管理API密钥等敏感配置
3.3 配置文件
在项目根目录创建.env文件,用于存放各平台的API Key:
# .env 环境变量配置 OPENAI_API_KEY=sk-your-openai-key ANTHROPIC_API_KEY=sk-ant-your-anthropic-key DASHSCOPE_API_KEY=sk-dashscope-key ZHIPUAI_API_KEY=your-zhipuai-key VOLCENGINE_ACCESS_KEY=your-ak VOLCENGINE_SECRET_KEY=your-sk KIMI_API_KEY=sk-your-moonshot-key DEEPSEEK_API_KEY=sk-your-deepseek-key DB_HOST=localhost DB_PORT=5432 DB_NAME=geo_monitor DB_USER=postgres DB_PASSWORD=your-password WEWORK_WEBHOOK=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx FEISHU_WEBHOOK=https://open.feishu.cn/open-apis/bot/v2/hook/xxx
四、关键词库构建
关键词库是整个监控系统的基础。一个好的关键词库应该覆盖以下几类词汇:
4.1 关键词分类
- 品牌词:公司名称、产品名称、创始人姓名等
- 产品词:产品线名称、功能模块名称、技术栈名称
- 通用词:行业通用术语,如AI引用监控、GEO优化、生成式搜索引擎优化
- 长尾问答词:用户可能提出的自然语言问题,如如何提高AI搜索引用率、GEO和SEO有什么区别
4.2 数据库表结构
我们用如下SQL创建关键词表:
CREATE TABLE keywords (
id SERIAL PRIMARY KEY,
keyword TEXT NOT NULL,
category VARCHAR(50) NOT NULL,
target_industry VARCHAR(100),
priority INT DEFAULT 3,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_keywords_category ON keywords(category);
CREATE INDEX idx_keywords_status ON keywords(status);
你可以用Python脚本或Excel导入的方式批量填充关键词表。一个中等规模的品牌通常需要200到500个关键词来获得足够的监控覆盖度。
五、AI引擎多源采集层
这是整个系统最核心也最复杂的部分。我们将实现一个统一的采集器抽象类,然后为每个AI引擎实现具体的适配器。
5.1 采集器基类设计
# collector_base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
@dataclass
class QueryResult:
engine: str
keyword: str
query: str
full_response: str
citations: list = field(default_factory=list)
raw_metadata: dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
error: Optional[str] = None
class AICollectorBase(ABC):
def __init__(self, engine_name: str, api_key: str):
self.engine_name = engine_name
self.api_key = api_key
@abstractmethod
def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
pass
def build_prompt(self, keyword: str) -> str:
return f"请详细回答以下问题。如果引用了任何外部来源,请列出参考来源的URL或名称:{keyword}"
5.2 ChatGPT采集器
ChatGPT是当前使用最广泛的AI助手。我们通过OpenAI官方Python SDK调用:
# collectors/chatgpt_collector.py
import openai
from collector_base import AICollectorBase, QueryResult
class ChatGPTCollector(AICollectorBase):
def __init__(self, api_key: str, model: str = "gpt-4o"):
super().__init__("ChatGPT", api_key)
self.model = model
self.client = openai.OpenAI(api_key=api_key)
def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
prompt = prompt_template or self.build_prompt(keyword)
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是一个乐于助人的AI助手,回答时请提供引用来源。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=4096
)
content = response.choices[0].message.content
return QueryResult(
engine=self.engine_name,
keyword=keyword,
query=prompt,
full_response=content,
raw_metadata={
"model": self.model,
"usage": response.usage.model_dump() if response.usage else {},
"finish_reason": response.choices[0].finish_reason
}
)
except Exception as e:
return QueryResult(
engine=self.engine_name, keyword=keyword,
query=prompt, full_response="", error=str(e)
)
5.3 Perplexity采集器
Perplexity以其强大的引用能力著称,是目前GEO监控中最重要的数据源之一。Perplexity的API兼容OpenAI格式,但返回结构中包含citations字段直接给出引用URL列表:
# collectors/perplexity_collector.py
import openai
from collector_base import AICollectorBase, QueryResult
class PerplexityCollector(AICollectorBase):
def __init__(self, api_key: str, model: str = "sonar-pro"):
super().__init__("Perplexity", api_key)
self.model = model
self.client = openai.OpenAI(
api_key=api_key,
base_url="https://api.perplexity.ai"
)
def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
prompt = prompt_template or self.build_prompt(keyword)
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
max_tokens=4096
)
content = response.choices[0].message.content
raw_citations = getattr(response, 'citations', [])
return QueryResult(
engine=self.engine_name,
keyword=keyword,
query=prompt,
full_response=content,
citations=raw_citations,
raw_metadata={"model": self.model, "citation_count": len(raw_citations)}
)
except Exception as e:
return QueryResult(
engine=self.engine_name, keyword=keyword,
query=prompt, full_response="", error=str(e)
)
5.4 DeepSeek采集器
DeepSeek近年来在中文AI搜索引擎领域表现突出,其API同样兼容OpenAI格式:
# collectors/deepseek_collector.py
import openai
from collector_base import AICollectorBase, QueryResult
class DeepSeekCollector(AICollectorBase):
def __init__(self, api_key: str, model: str = "deepseek-chat"):
super().__init__("DeepSeek", api_key)
self.model = model
self.client = openai.OpenAI(
api_key=api_key,
base_url="https://api.deepseek.com"
)
def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
prompt = prompt_template or self.build_prompt(keyword)
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=4096
)
return QueryResult(
engine=self.engine_name,
keyword=keyword,
query=prompt,
full_response=response.choices[0].message.content,
raw_metadata={"model": self.model}
)
except Exception as e:
return QueryResult(
engine=self.engine_name, keyword=keyword,
query=prompt, full_response="", error=str(e)
)
5.5 通义千问采集器
阿里云的通义千问通过DashScope SDK调用。注意通义千问提供了搜索增强模式,对GEO监控尤为重要:
# collectors/qwen_collector.py
import dashscope
from dashscope import Generation
from collector_base import AICollectorBase, QueryResult
class QwenCollector(AICollectorBase):
def __init__(self, api_key: str, model: str = "qwen-max"):
super().__init__("通义千问", api_key)
self.model = model
dashscope.api_key = api_key
def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
prompt = prompt_template or self.build_prompt(keyword)
try:
response = Generation.call(
model=self.model,
messages=[{"role": "user", "content": prompt}],
result_format='message',
enable_search=True,
search_options={
"enable_citation": True,
"enable_source": True
}
)
if response.status_code == 200:
content = response.output.choices[0].message.content
search_info = getattr(response.output, 'search_info', {}) or {}
citations = search_info.get('search_results', [])
return QueryResult(
engine=self.engine_name,
keyword=keyword,
query=prompt,
full_response=content,
citations=citations,
raw_metadata={"model": self.model, "search_enabled": True}
)
else:
return QueryResult(
engine=self.engine_name, keyword=keyword,
query=prompt, full_response="",
error=f"API返回错误: {response.message}"
)
except Exception as e:
return QueryResult(
engine=self.engine_name, keyword=keyword,
query=prompt, full_response="", error=str(e)
)
5.6 统一调度器
有了各个采集器的实现,我们可以编写一个调度器来批量运行所有引擎的采集任务。调度器使用线程池并发执行多个引擎的查询请求,并加入随机延迟以避免触发API的频率限制:
# scheduler.py
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from collectors.chatgpt_collector import ChatGPTCollector
from collectors.perplexity_collector import PerplexityCollector
from collectors.deepseek_collector import DeepSeekCollector
from collectors.qwen_collector import QwenCollector
class CollectorScheduler:
def __init__(self, config: dict):
self.config = config
self.collectors = self._init_collectors()
def _init_collectors(self) -> list:
collectors = []
if self.config.get('openai_key'):
collectors.append(ChatGPTCollector(self.config['openai_key']))
if self.config.get('perplexity_key'):
collectors.append(PerplexityCollector(self.config['perplexity_key']))
if self.config.get('deepseek_key'):
collectors.append(DeepSeekCollector(self.config['deepseek_key']))
if self.config.get('dashscope_key'):
collectors.append(QwenCollector(self.config['dashscope_key']))
return collectors
def run_batch(self, keywords: list, max_workers: int = 3) -> list:
all_results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for collector in self.collectors:
for kw in keywords:
futures.append(executor.submit(
self._safe_collect, collector, kw
))
for future in as_completed(futures):
result = future.result()
if result:
all_results.append(result)
time.sleep(random.uniform(0.5, 2.0))
return all_results
def _safe_collect(self, collector, keyword: str):
try:
return collector.query(keyword)
except Exception as e:
print(f"[ERROR] {collector.engine_name} 查询 '{keyword}' 失败: {e}")
return None
六、引用判定与解析
6.1 引用判定策略
引用判定是整个系统中最考验工程能力的环节。AI的回答往往是自然语言,它可能直接提到你的品牌,也可能以变体、缩写或近似表达的方式提及。我们采用三阶段判定策略:
- 第一阶段——精确匹配:直接用正则表达式匹配品牌名称、产品名称、域名等硬标识
- 第二阶段——语义匹配:使用文本嵌入模型计算回答文本与品牌描述的余弦相似度,推荐使用OpenAI的text-embedding-3-small模型
- 第三阶段——人工抽检:对前两阶段结果不一致的样本进行人工复核,并持续优化判定规则
6.2 引用判定核心代码
# citation_detector.py
import re
import numpy as np
from typing import List
class CitationDetector:
def __init__(self, brand_patterns: List[str],
brand_aliases: List[str] = None,
domains: List[str] = None):
self.brand_patterns = brand_patterns
self.brand_aliases = brand_aliases or []
self.domains = domains or []
all_terms = brand_patterns + self.brand_aliases
self.pattern = re.compile(
'|'.join(re.escape(t) for t in all_terms),
re.IGNORECASE
)
if self.domains:
domain_pattern = '|'.join(re.escape(d) for d in self.domains)
self.domain_pattern = re.compile(
rf'(https?://)?(?:www\.)?({domain_pattern})\\b',
re.IGNORECASE
)
else:
self.domain_pattern = None
def detect_exact_match(self, text: str) -> List[dict]:
matches = []
for m in self.pattern.finditer(text):
matches.append({
'type': 'exact',
'matched_text': m.group(),
'position': m.start(),
'context': self._get_context(text, m.start(), m.end())
})
return matches
def detect_domain_match(self, text: str) -> List[dict]:
if not self.domain_pattern:
return []
matches = []
for m in self.domain_pattern.finditer(text):
matches.append({
'type': 'domain',
'matched_text': m.group(),
'position': m.start(),
'context': self._get_context(text, m.start(), m.end())
})
return matches
def compute_semantic_score(self, text: str, brand_embedding,
text_embedding) -> float:
if text_embedding is None or brand_embedding is None:
return 0.0
dot_product = np.dot(brand_embedding, text_embedding)
norm_a = np.linalg.norm(brand_embedding)
norm_b = np.linalg.norm(text_embedding)
if norm_a == 0 or norm_b == 0:
return 0.0
return float(dot_product / (norm_a * norm_b))
def analyze_citation(self, response_text: str,
brand_embedding=None,
text_embedding=None,
semantic_threshold: float = 0.75) -> dict:
result = {
'is_cited': False,
'exact_matches': [],
'domain_matches': [],
'semantic_score': 0.0,
'citation_rank': None,
'citation_context': None
}
exact_matches = self.detect_exact_match(response_text)
result['exact_matches'] = exact_matches
domain_matches = self.detect_domain_match(response_text)
result['domain_matches'] = domain_matches
if brand_embedding is not None and text_embedding is not None:
score = self.compute_semantic_score(
response_text, brand_embedding, text_embedding
)
result['semantic_score'] = score
if score >= semantic_threshold:
result['is_cited'] = True
if exact_matches or domain_matches:
result['is_cited'] = True
first_match = (exact_matches + domain_matches)[0]
result['citation_rank'] = first_match['position']
result['citation_context'] = first_match['context']
return result
def _get_context(self, text: str, start: int, end: int,
window: int = 100) -> str:
ctx_start = max(0, start - window)
ctx_end = min(len(text), end + window)
prefix = "..." if ctx_start > 0 else ""
suffix = "..." if ctx_end < len(text) else ""
return prefix + text[ctx_start:ctx_end] + suffix
6.3 引用位次与情感分析
在AI回答中,引用位次(你的品牌是第几个被提及的)是一个关键指标。被第一个提及意味着AI对你的品牌有最高权重。此外,我们还需要对引用进行情感分析,判断提及的情感倾向是正面、中性还是负面。以下是基于HuggingFace的中文情感分析实现:
# sentiment_analyzer.py
from transformers import pipeline
class SentimentAnalyzer:
def __init__(self):
self.classifier = pipeline(
"sentiment-analysis",
model="uer/roberta-base-finetuned-jd-binary-chinese"
)
def analyze(self, context_text: str) -> dict:
if not context_text or len(context_text.strip()) < 5:
return {"label": "NEUTRAL", "score": 0.5}
try:
result = self.classifier(context_text[:512])
return result[0]
except Exception as e:
print(f"情感分析失败: {e}")
return {"label": "UNKNOWN", "score": 0.0}
七、数据库设计与存储
7.1 完整数据库表结构
我们使用PostgreSQL作为主数据库,并利用SQLAlchemy ORM进行对象关系映射。完整的表结构设计包括三张核心表:查询日志主表、引用详情表和每日汇总统计表。
-- 查询日志主表
CREATE TABLE query_logs (
id SERIAL PRIMARY KEY,
engine VARCHAR(50) NOT NULL,
keyword_id INT REFERENCES keywords(id),
keyword TEXT NOT NULL,
query_text TEXT NOT NULL,
response_text TEXT NOT NULL,
response_length INT,
is_cited BOOLEAN DEFAULT FALSE,
citation_count INT DEFAULT 0,
citation_rank INT,
semantic_score FLOAT,
sentiment_label VARCHAR(20),
sentiment_score FLOAT,
raw_response_json JSONB,
error_message TEXT,
query_timestamp TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_query_logs_engine ON query_logs(engine);
CREATE INDEX idx_query_logs_is_cited ON query_logs(is_cited);
CREATE INDEX idx_query_logs_timestamp ON query_logs(query_timestamp);
CREATE INDEX idx_query_logs_engine_cited ON query_logs(engine, is_cited);
-- 引用详情表
CREATE TABLE citation_details (
id SERIAL PRIMARY KEY,
query_log_id INT REFERENCES query_logs(id),
match_type VARCHAR(20) NOT NULL,
matched_text TEXT NOT NULL,
match_position INT,
context_before TEXT,
context_after TEXT,
cited_url TEXT,
sentiment_label VARCHAR(20),
sentiment_score FLOAT,
created_at TIMESTAMP DEFAULT NOW()
);
-- 每日汇总统计表
CREATE TABLE daily_stats (
id SERIAL PRIMARY KEY,
stat_date DATE NOT NULL,
engine VARCHAR(50) NOT NULL,
keyword_id INT REFERENCES keywords(id),
total_queries INT DEFAULT 0,
cited_queries INT DEFAULT 0,
citation_rate FLOAT DEFAULT 0.0,
avg_citation_rank FLOAT,
avg_semantic_score FLOAT,
positive_count INT DEFAULT 0,
negative_count INT DEFAULT 0,
neutral_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(stat_date, engine, keyword_id)
);
7.2 SQLAlchemy数据访问层
使用SQLAlchemy可以方便地操作数据库。下面是数据模型定义和数据库管理器实现:
# database.py
from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, Text, Date, DateTime, ForeignKey, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
Base = declarative_base()
class Keyword(Base):
__tablename__ = 'keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(500), nullable=False)
category = Column(String(50), nullable=False)
target_industry = Column(String(100))
priority = Column(Integer, default=3)
status = Column(String(20), default='active')
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class QueryLog(Base):
__tablename__ = 'query_logs'
id = Column(Integer, primary_key=True)
engine = Column(String(50), nullable=False)
keyword_id = Column(Integer, ForeignKey('keywords.id'))
keyword = Column(String(500), nullable=False)
query_text = Column(Text)
response_text = Column(Text)
response_length = Column(Integer)
is_cited = Column(Boolean, default=False)
citation_count = Column(Integer, default=0)
citation_rank = Column(Integer)
semantic_score = Column(Float)
sentiment_label = Column(String(20))
sentiment_score = Column(Float)
raw_response_json = Column(JSON)
error_message = Column(Text)
query_timestamp = Column(DateTime, default=datetime.now)
created_at = Column(DateTime, default=datetime.now)
citations = relationship("CitationDetail", back_populates="query_log")
class CitationDetail(Base):
__tablename__ = 'citation_details'
id = Column(Integer, primary_key=True)
query_log_id = Column(Integer, ForeignKey('query_logs.id'))
match_type = Column(String(20), nullable=False)
matched_text = Column(Text, nullable=False)
match_position = Column(Integer)
context_before = Column(Text)
context_after = Column(Text)
cited_url = Column(Text)
sentiment_label = Column(String(20))
sentiment_score = Column(Float)
created_at = Column(DateTime, default=datetime.now)
query_log = relationship("QueryLog", back_populates="citations")
class DatabaseManager:
def __init__(self, db_url: str):
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
def init_tables(self):
Base.metadata.create_all(self.engine)
def save_query_result(self, result, keyword_id: int):
session = self.Session()
try:
log = QueryLog(
engine=result.engine,
keyword_id=keyword_id,
keyword=result.keyword,
query_text=result.query,
response_text=result.full_response,
response_length=len(result.full_response) if result.full_response else 0,
raw_response_json=result.raw_metadata,
error_message=result.error,
query_timestamp=result.timestamp
)
session.add(log)
session.commit()
return log.id
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def update_citation_result(self, log_id: int, citation_result: dict):
session = self.Session()
try:
log = session.query(QueryLog).filter_by(id=log_id).first()
if log:
log.is_cited = citation_result['is_cited']
log.citation_count = len(
citation_result['exact_matches'] +
citation_result['domain_matches']
)
log.citation_rank = citation_result['citation_rank']
log.semantic_score = citation_result['semantic_score']
for match in citation_result['exact_matches']:
detail = CitationDetail(
query_log_id=log_id,
match_type=match['type'],
matched_text=match['matched_text'],
match_position=match['position'],
context_after=match['context']
)
session.add(detail)
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
八、可视化看板
数据采集和分析之后,我们需要一个直观的看板来展示结果。使用Streamlit和Plotly可以快速搭建交互式仪表盘。看板包含KPI指标卡片、引用率趋势图和引用位次分布图:
# dashboard.py
import streamlit as st
import pandas as pd
import plotly.express as px
import os
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
st.set_page_config(page_title="AI引用率监控看板", layout="wide")
@st.cache_resource
def get_db_connection():
db_url = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
return create_engine(db_url)
engine = get_db_connection()
st.title("AI引用率监控与诊断看板")
st.markdown("实时监控品牌在各大AI搜索引擎中的引用情况")
st.sidebar.header("筛选条件")
date_range = st.sidebar.date_input(
"日期范围",
[datetime.now() - timedelta(days=30), datetime.now()]
)
selected_engines = st.sidebar.multiselect(
"选择AI引擎",
["ChatGPT", "Perplexity", "DeepSeek", "通义千问", "Kimi", "豆包", "Claude"],
default=["ChatGPT", "Perplexity", "DeepSeek"]
)
col1, col2, col3, col4 = st.columns(4)
sql_metrics = '''
SELECT
COUNT(*) as total_queries,
SUM(CASE WHEN is_cited THEN 1 ELSE 0 END) as cited_queries,
AVG(semantic_score) as avg_semantic_score,
AVG(CASE WHEN is_cited THEN citation_rank END) as avg_rank
FROM query_logs
WHERE query_timestamp BETWEEN :start_date AND :end_date
AND engine = ANY(:engines)
'''
query = text(sql_metrics)
df_metrics = pd.read_sql(query, engine, params={
'start_date': date_range[0],
'end_date': date_range[1],
'engines': selected_engines
})
total = df_metrics['total_queries'].iloc[0] or 0
cited = df_metrics['cited_queries'].iloc[0] or 0
rate = (cited / total * 100) if total > 0 else 0
with col1:
st.metric("总查询次数", f"{total:,}")
with col2:
st.metric("被引用次数", f"{cited:,}")
with col3:
st.metric("引用率", f"{rate:.2f}%")
with col4:
avg_rank = df_metrics['avg_rank'].iloc[0]
st.metric("平均引用位次", f"{avg_rank:.1f}" if avg_rank else "N/A")
st.subheader("引用率趋势(按引擎)")
sql_trend = '''
SELECT
DATE(query_timestamp) as date,
engine,
COUNT(*) as total,
SUM(CASE WHEN is_cited THEN 1 ELSE 0 END) as cited
FROM query_logs
WHERE query_timestamp BETWEEN :start_date AND :end_date
AND engine = ANY(:engines)
GROUP BY DATE(query_timestamp), engine
ORDER BY date
'''
trend_query = text(sql_trend)
df_trend = pd.read_sql(trend_query, engine, params={
'start_date': date_range[0],
'end_date': date_range[1],
'engines': selected_engines
})
df_trend['citation_rate'] = (df_trend['cited'] / df_trend['total'] * 100).fillna(0)
fig_trend = px.line(
df_trend, x='date', y='citation_rate', color='engine',
title='各引擎引用率趋势',
labels={'citation_rate': '引用率 (%)', 'date': '日期', 'engine': '引擎'}
)
fig_trend.update_layout(height=400)
st.plotly_chart(fig_trend, use_container_width=True)
st.subheader("引用位次分布")
sql_rank = '''
SELECT engine, citation_rank, COUNT(*) as count
FROM query_logs
WHERE is_cited = TRUE
AND citation_rank IS NOT NULL
AND query_timestamp BETWEEN :start_date AND :end_date
AND engine = ANY(:engines)
GROUP BY engine, citation_rank
ORDER BY engine, citation_rank
'''
rank_query = text(sql_rank)
df_rank = pd.read_sql(rank_query, engine, params={
'start_date': date_range[0],
'end_date': date_range[1],
'engines': selected_engines
})
fig_rank = px.bar(
df_rank, x='engine', y='count', color='citation_rank',
barmode='group', title='各引擎引用位次分布',
labels={'count': '数量', 'engine': '引擎', 'citation_rank': '引用位次'}
)
st.plotly_chart(fig_rank, use_container_width=True)
运行看板只需要在终端执行:
streamlit run dashboard.py
然后浏览器打开 http://localhost:8501 即可查看交互式数据看板。
九、异常告警系统
当AI引用率出现异常波动(如大幅下降或飙升),我们需要第一时间知道。这里实现企业微信和飞书的Webhook告警,告警规则为当前引用率较前7天均值下降超过20个百分点即触发告警:
# alerting.py
import requests
from datetime import datetime
from sqlalchemy import create_engine, text
class AlertManager:
def __init__(self, db_url: str, webhook_configs: dict):
self.engine = create_engine(db_url)
self.wework_webhook = webhook_configs.get('wework')
self.feishu_webhook = webhook_configs.get('feishu')
def check_citation_rate(self, engine_name: str,
threshold: float = 0.2,
lookback_days: int = 7) -> dict:
sql_check = """
WITH current_rate AS (
SELECT
SUM(CASE WHEN is_cited THEN 1 ELSE 0 END)::float
/ NULLIF(COUNT(*), 0) as rate
FROM query_logs
WHERE engine = :engine
AND query_timestamp >= NOW() - INTERVAL '1 day'
),
historical_rate AS (
SELECT
SUM(CASE WHEN is_cited THEN 1 ELSE 0 END)::float
/ NULLIF(COUNT(*), 0) as rate
FROM query_logs
WHERE engine = :engine
AND query_timestamp BETWEEN
NOW() - INTERVAL ':days days'
AND NOW() - INTERVAL '1 day'
)
SELECT c.rate as current_rate,
h.rate as historical_rate,
CASE WHEN h.rate > 0 THEN
(c.rate - h.rate) / h.rate ELSE NULL
END as change_pct
FROM current_rate c, historical_rate h
"""
query = text(sql_check)
with self.engine.connect() as conn:
result = conn.execute(query, {
'engine': engine_name,
'days': lookback_days
}).fetchone()
if result and result.current_rate is not None:
alert_info = {
'engine': engine_name,
'current_rate': round(result.current_rate * 100, 2),
'historical_rate': round((result.historical_rate or 0) * 100, 2),
'change_pct': round((result.change_pct or 0) * 100, 2),
'should_alert': False
}
if result.change_pct is not None and result.change_pct < -threshold:
alert_info['should_alert'] = True
alert_info['severity'] = 'warning'
elif result.change_pct is not None and result.change_pct < -threshold * 2:
alert_info['should_alert'] = True
alert_info['severity'] = 'critical'
return alert_info
return {'engine': engine_name, 'should_alert': False, 'error': 'No data'}
def send_wework_alert(self, alert_info: dict):
if not self.wework_webhook or not alert_info.get('should_alert'):
return
content = f"## AI引用率异常告警\\n> 引擎: {alert_info['engine']}\\n> 当前引用率: {alert_info['current_rate']}%\\n> 历史均值: {alert_info['historical_rate']}%\\n> 变化幅度: {alert_info['change_pct']}%\\n> 告警时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\n请登录AI引用率监控看板查看详情。"
payload = {"msgtype": "markdown", "markdown": {"content": content}}
try:
resp = requests.post(self.wework_webhook, json=payload, timeout=10)
if resp.status_code == 200:
print(f"[ALERT] 企业微信告警已发送: {alert_info['engine']}")
else:
print(f"[ERROR] 企业微信告警发送失败: {resp.text}")
except Exception as e:
print(f"[ERROR] 企业微信告警异常: {e}")
def send_feishu_alert(self, alert_info: dict):
if not self.feishu_webhook or not alert_info.get('should_alert'):
return
card_content = f"**引擎**: {alert_info['engine']}\\n**当前引用率**: {alert_info['current_rate']}%\\n**历史均值**: {alert_info['historical_rate']}%\\n**变化幅度**: {alert_info['change_pct']}%\\n**告警时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
payload = {
"msg_type": "interactive",
"card": {
"header": {
"title": {"tag": "plain_text", "content": "AI引用率异常告警"},
"template": "red" if alert_info.get('severity') == 'critical' else "orange"
},
"elements": [{
"tag": "div",
"text": {"tag": "lark_md", "content": card_content}
}]
}
}
try:
resp = requests.post(self.feishu_webhook, json=payload, timeout=10)
if resp.status_code == 200:
print(f"[ALERT] 飞书告警已发送: {alert_info['engine']}")
else:
print(f"[ERROR] 飞书告警发送失败: {resp.text}")
except Exception as e:
print(f"[ERROR] 飞书告警异常: {e}")
def run_daily_check(self):
engines = ["ChatGPT", "Perplexity", "DeepSeek", "通义千问", "Kimi", "豆包"]
for engine in engines:
alert_info = self.check_citation_rate(engine)
if alert_info.get('should_alert'):
self.send_wework_alert(alert_info)
self.send_feishu_alert(alert_info)
print(f"[INFO] 每日告警检查完成 - {datetime.now()}")
十、定时任务与自动化
为了让系统持续自动运行,我们需要配置定时任务。使用Python的schedule库来实现每日自动采集和告警。采集任务每4小时执行一次,告警任务每天早上9点执行:
# cron_runner.py
import schedule
import time
import yaml
from dotenv import load_dotenv
import os
from scheduler import CollectorScheduler
from citation_detector import CitationDetector
from database import DatabaseManager
from alerting import AlertManager
load_dotenv()
with open('config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
db_manager = DatabaseManager(os.getenv('DATABASE_URL'))
collector_scheduler = CollectorScheduler({
'openai_key': os.getenv('OPENAI_API_KEY'),
'perplexity_key': os.getenv('PERPLEXITY_API_KEY'),
'deepseek_key': os.getenv('DEEPSEEK_API_KEY'),
'dashscope_key': os.getenv('DASHSCOPE_API_KEY'),
})
detector = CitationDetector(
brand_patterns=config['brand_patterns'],
domains=config['domains']
)
alert_manager = AlertManager(
db_url=os.getenv('DATABASE_URL'),
webhook_configs={
'wework': os.getenv('WEWORK_WEBHOOK'),
'feishu': os.getenv('FEISHU_WEBHOOK')
}
)
def run_collection_job():
print(f"[JOB] 开始采集任务 - {time.strftime('%Y-%m-%d %H:%M:%S')}")
keywords = config.get('keywords', [])
results = collector_scheduler.run_batch(keywords[:20])
for result in results:
log_id = db_manager.save_query_result(result, keyword_id=0)
citation = detector.analyze_citation(result.full_response)
db_manager.update_citation_result(log_id, citation)
print(f"[JOB] 采集任务完成 - 共处理 {len(results)} 条结果")
def run_alert_job():
print(f"[JOB] 开始告警检查 - {time.strftime('%Y-%m-%d %H:%M:%S')}")
alert_manager.run_daily_check()
schedule.every(4).hours.do(run_collection_job)
schedule.every().day.at("09:00").do(run_alert_job)
run_collection_job()
if __name__ == "__main__":
print("[SYSTEM] AI引用率监控系统已启动")
print("[SYSTEM] 定时任务已配置:采集每4小时 | 告警每日09:00")
while True:
schedule.run_pending()
time.sleep(60)
十一、实战中的踩坑与优化建议
11.1 API调用成本控制
各大AI引擎的API调用都有费用,如果不加控制,每月费用可能高达数千元。以下是几个节省成本的策略:
- 优先级策略:高优先级关键词(品牌词、核心产品词)每天采集1次,中等优先级每3天1次,低优先级每周1次。通过控制关键词级别可以灵活调节成本
- 采样策略:同类关键词(如10个竞品名称)只随机采样3个,可以降低百分之三十的调用量并且不显著影响数据趋势
- 缓存复用:同一关键词在24小时内的重复查询直接使用缓存结果,避免重复调用同一API
- 模型选择:非核心引擎使用便宜模型,如gpt-3.5-turbo代替gpt-4o,可以减少一半以上的API费用
11.2 引用误判问题
在引用判定中,最常见的坑是过度匹配和漏匹配。举例来说品牌名云帆可能出现在千帆云帆这种无关上下文中;而YunFan(拼音)或缩写YF则可能漏掉。建议采取以下措施:
- 维护一个黑名单词表,排除常见误匹配模式
- 定期(每周)对边缘样本进行人工抽检,持续优化正则规则
- 引入NER(命名实体识别)模型辅助判定,降低误判率
- 使用上下文窗口判断匹配是否与品牌语义相关
11.3 API限流与重试
几乎所有AI API都有速率限制(Rate Limit)。遇到429错误时需要优雅地处理:
- 实现指数退避重试(Exponential Backoff):第1次等1秒,第2次等2秒,第3次等4秒,最多重试5次
- 使用令牌桶算法控制整体调用速率
- 对频繁触发限流的引擎自动降级,降低采集频率
11.4 数据量与存储
假设监控200个关键词、覆盖8个引擎、每天采集1次,每个回答平均2000字符,则每天产生约3.2MB文本数据,每月约96MB。如果需要保留一年数据,建议采取以下措施:
- 对response_text字段启用PostgreSQL的TOAST压缩
- 超过3个月的数据自动归档到S3或对象存储
- 汇总数据保留在数据库中,明细数据只保留近3个月
- 在query_timestamp上建立分区表,按月份分区,提升查询效率
十二、GEO优化的行动建议
当你通过这套系统获得了AI引用率数据后,下一步就是行动。以下是基于监控数据的GEO优化建议:
- 发现高引用内容特征:分析哪些页面或内容被AI高频引用,总结其共性(字数、结构、格式、权威度信号等),将这些特征应用到其他页面
- 补齐零引用缺口:找出对你所在行业高频提问但你的品牌完全未被提及的关键词,针对性地创建专题内容
- 优化引用位次:如果你的品牌总是出现在AI回答的后半部分,说明你在该主题上的权威性不足,需要加强外链建设、权威引用和结构化数据标记
- 监控竞品引用:将竞品名称也加入关键词库,对比你和竞品的引用率差异,找出竞品的GEO优势来源
- 情感干预:如果AI对你的引用带有负面情感,需要分析原因并发布正面内容进行对冲
- 结构化数据标记:在网站上添加Schema.org结构化数据,帮助AI引擎更好地理解和引用你的内容
- 内容新鲜度维护:定期更新被高引用的内容页面,保持内容的新鲜度和相关性,AI引擎通常更倾向于引用近期更新的内容
十三、常见问题解答
问:如果我没有所有AI引擎的API Key,系统还能运行吗?
答:完全可以。调度器会根据配置文件中是否存在对应的API Key来初始化采集器。你可以只配置已有权限的引擎,系统会自动跳过未配置的引擎。
问:API调用的每月成本大概是多少?
答:这取决于关键词数量和引擎覆盖率。以监控200个关键词、覆盖4个引擎为例,每天约800次API调用,按照主流大模型的价格,每月费用大约在200到500元人民币之间。通过优先级策略和采样策略,可以降低到100元以下。
问:引用判定的准确率能达到多少?
答:精确匹配的准确率接近百分之百,语义匹配在设置合理阈值的情况下准确率约85%到90%。综合来看,结合三阶段判定后,整体的召回率约95%,准确率约92%。建议在实际使用中每周人工抽检20到30条样本。
问:系统支持实时监控吗?
答:默认配置是定时批量采集(每4小时一次),不完全实时。如果需要接近实时的监控,可以将采集间隔缩短到30分钟。但需要注意API调用成本和频率限制。
问:如何添加新的AI引擎?
答:只需继承AICollectorBase基类,实现query方法,然后在CollectorScheduler的_init_collectors中添加实例即可。整个过程大约需要10到20行代码。
十四、总结与展望
本文详细讲解了如何用Python从零构建一套AI引用率监控与诊断系统,覆盖了关键词库构建、多源数据采集(ChatGPT、Perplexity、DeepSeek、通义千问等八大引擎)、引用判定(精确匹配加上语义匹配加上情感分析)、数据库存储、Streamlit可视化看板和异常告警(企业微信加上飞书)等全流程。
这套系统的核心价值在于:让品牌在AI时代的搜索结果中获得可见性和可控性。在传统的SEO中,你可以通过Google Search Console和百度站长平台查看搜索表现;但在AI搜索中,此前几乎没有任何免费的开源工具可以做到这一点。本文的完整代码合计超过五百行,可以直接作为项目的启动框架,你可以根据自己的需求进行二次开发和定制。
展望未来,AI引用率监控领域还有以下方向值得探索:
- 多模态引用检测:随着AI支持图片和视频搜索,需要扩展到图片水印检测和视频内容识别
- 实时流式监控:使用WebSocket实时监听AI回答流,第一时间发现引用变化
- A/B测试平台:系统化地测试不同内容策略对AI引用率的影响,找到最优GEO方案
- 预测模型:基于历史数据训练模型,预测哪些内容策略最可能提升AI引用率
- 多语言覆盖:将监控范围扩展到英文、日文、韩文等全球主流语言,支持出海品牌的GEO需求
- 引用归因分析:追踪AI引用你的内容后,用户的后续行为(点击率、转化率)以量化GEO的商业价值
AI搜索时代已经到来,GEO将成为每一家关注线上增长的企业不可或缺的核心能力。当用户不再逐个点击搜索结果,而是直接获取AI总结的答案时,你的品牌能否出现在那个答案中,将成为决定线上可见性的关键。希望本文能成为你踏入GEO领域的第一步,也欢迎在实践中与你交流更多经验与反馈。
附录:完整项目文件结构与部署指南
geo-citation-monitor/ ├── .env ├── config.yaml ├── collector_base.py ├── collectors/ │ ├── __init__.py │ ├── chatgpt_collector.py │ ├── perplexity_collector.py │ ├── deepseek_collector.py │ ├── qwen_collector.py │ ├── kimi_collector.py │ └── doubao_collector.py ├── citation_detector.py ├── sentiment_analyzer.py ├── database.py ├── scheduler.py ├── alerting.py ├── dashboard.py ├── cron_runner.py ├── utils/ │ ├── __init__.py │ ├── logger.py │ └── config_loader.py └── requirements.txt
执行以下命令部署系统:
# 1. 安装依赖
pip install -r requirements.txt
# 2. 配置环境变量
cp .env.example .env
# 编辑 .env,填入各平台的 API Key
# 3. 初始化数据库
python -c "from database import DatabaseManager; DatabaseManager('postgresql://user:pass@localhost/geo_monitor').init_tables()"
# 4. 启动监控系统(定时采集+告警)
python cron_runner.py
# 5. 启动可视化看板(另开终端)
streamlit run dashboard.py
参考资料:
- OpenAI API文档: https://platform.openai.com/docs
- Perplexity API: https://docs.perplexity.ai
- DeepSeek API: https://platform.deepseek.com/docs
- 阿里云灵积DashScope: https://help.aliyun.com/document_detail/2712195.html
- Streamlit文档: https://docs.streamlit.io
- Plotly Python文档: https://plotly.com/python/
- SQLAlchemy文档: https://docs.sqlalchemy.org/




