实操指南:用Python构建AI引用率监控与诊断系统(附完整代码)

一、引言:为什么你需要关心AI引用率

2024年以来,以ChatGPT、Perplexity、Kimi、豆包、DeepSeek等为代表的AI搜索引擎和对话助手正在重塑用户的搜索行为。越来越多的用户不再打开传统搜索引擎,而是直接在AI助手面前提问。这意味着一个全新的SEO战场已经开辟——GEO(Generative Engine Optimization,生成式引擎优化)

在GEO体系中,最核心的指标莫过于AI引用率:你的品牌、内容或产品,在各大AI引擎的回答中被提及和引用的频率究竟有多高?你是否知道你的竞品在AI回答中出现的频次是你的几倍?你是否能第一时间发现AI引用了你网站的哪个页面、引用了什么内容、引用的位次和上下文是什么?

然而,目前市面上成熟的AI引用监控工具寥寥无几,且大多以付费SaaS形式存在。对于技术团队来说,完全可以用Python搭建一套自己的AI引用率监控与诊断系统。本文将手把手教你从零构建这套系统,附带完整可运行代码,覆盖关键词库构建、多源数据采集、引用判定、位次抓取、数据存储、可视化看板和异常告警等全流程。

本文面向读者:技术开发者、SEO工程师、数据分析师。你需要具备Python基础、基本的数据库操作能力和一点点API调试经验。

二、系统架构概览

在动手写代码之前,我们先来理解整个系统的架构。系统包括以下核心组件:

  • 关键词库:覆盖你所在行业的品牌词、产品词、通用词、长尾问答词
  • 多源采集层:通过API或自动化方式,向ChatGPT、Perplexity、Claude、Kimi、文心一言、通义千问、豆包、DeepSeek等引擎发送提问并抓取回答
  • 引用判定层:对AI回答进行语义匹配与正则匹配,判定你的品牌或内容是否被引用
  • 引用解析层:提取引用的位次(第几个被提及)、引用上下文(前后文本)、引用来源URL
  • 数据存储层:使用PostgreSQL或MySQL持久化存储监控数据
  • 可视化看板:基于Streamlit和Plotly构建交互式仪表盘
  • 告警层:引用率异常波动时通过企业微信或飞书Webhook推送告警

三、环境准备

3.1 Python环境

首先确保你安装了Python 3.10或以上版本。推荐使用conda或venv创建虚拟环境:

conda create -n geo_monitor python=3.11
conda activate geo_monitor

3.2 安装依赖库

本项目需要的核心依赖如下。我们用pip一键安装:

pip install openai anthropic requests beautifulsoup4 sqlalchemy psycopg2-binary streamlit plotly pandas numpy schedule python-dotenv dashscope zhipuai volcengine

各库用途说明:

  • openai:调用ChatGPT API,兼用于Perplexity、DeepSeek等兼容OpenAI格式的接口
  • anthropic:调用Claude API
  • dashscope:阿里云灵积模型服务,用于调用通义千问
  • zhipuai:智谱AI,用于调用ChatGLM系列模型
  • volcengine:火山引擎,用于调用豆包系列模型
  • sqlalchemypsycopg2-binary:数据库ORM与PostgreSQL驱动,如需MySQL则改用pymysql
  • streamlitplotly:构建数据可视化看板
  • schedule:定时任务调度
  • python-dotenv:管理API密钥等敏感配置

3.3 配置文件

在项目根目录创建.env文件,用于存放各平台的API Key:

# .env 环境变量配置
OPENAI_API_KEY=sk-your-openai-key
ANTHROPIC_API_KEY=sk-ant-your-anthropic-key
DASHSCOPE_API_KEY=sk-dashscope-key
ZHIPUAI_API_KEY=your-zhipuai-key
VOLCENGINE_ACCESS_KEY=your-ak
VOLCENGINE_SECRET_KEY=your-sk
KIMI_API_KEY=sk-your-moonshot-key
DEEPSEEK_API_KEY=sk-your-deepseek-key
DB_HOST=localhost
DB_PORT=5432
DB_NAME=geo_monitor
DB_USER=postgres
DB_PASSWORD=your-password
WEWORK_WEBHOOK=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx
FEISHU_WEBHOOK=https://open.feishu.cn/open-apis/bot/v2/hook/xxx

四、关键词库构建

关键词库是整个监控系统的基础。一个好的关键词库应该覆盖以下几类词汇:

4.1 关键词分类

  • 品牌词:公司名称、产品名称、创始人姓名等
  • 产品词:产品线名称、功能模块名称、技术栈名称
  • 通用词:行业通用术语,如AI引用监控、GEO优化、生成式搜索引擎优化
  • 长尾问答词:用户可能提出的自然语言问题,如如何提高AI搜索引用率、GEO和SEO有什么区别

4.2 数据库表结构

我们用如下SQL创建关键词表:

CREATE TABLE keywords (
    id SERIAL PRIMARY KEY,
    keyword TEXT NOT NULL,
    category VARCHAR(50) NOT NULL,
    target_industry VARCHAR(100),
    priority INT DEFAULT 3,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_keywords_category ON keywords(category);
CREATE INDEX idx_keywords_status ON keywords(status);

你可以用Python脚本或Excel导入的方式批量填充关键词表。一个中等规模的品牌通常需要200到500个关键词来获得足够的监控覆盖度。

五、AI引擎多源采集层

这是整个系统最核心也最复杂的部分。我们将实现一个统一的采集器抽象类,然后为每个AI引擎实现具体的适配器。

5.1 采集器基类设计

# collector_base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime

@dataclass
class QueryResult:
    engine: str
    keyword: str
    query: str
    full_response: str
    citations: list = field(default_factory=list)
    raw_metadata: dict = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)
    error: Optional[str] = None

class AICollectorBase(ABC):

    def __init__(self, engine_name: str, api_key: str):
        self.engine_name = engine_name
        self.api_key = api_key

    @abstractmethod
    def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
        pass

    def build_prompt(self, keyword: str) -> str:
        return f"请详细回答以下问题。如果引用了任何外部来源,请列出参考来源的URL或名称:{keyword}"

5.2 ChatGPT采集器

ChatGPT是当前使用最广泛的AI助手。我们通过OpenAI官方Python SDK调用:

# collectors/chatgpt_collector.py
import openai
from collector_base import AICollectorBase, QueryResult

class ChatGPTCollector(AICollectorBase):

    def __init__(self, api_key: str, model: str = "gpt-4o"):
        super().__init__("ChatGPT", api_key)
        self.model = model
        self.client = openai.OpenAI(api_key=api_key)

    def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
        prompt = prompt_template or self.build_prompt(keyword)
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": "你是一个乐于助人的AI助手,回答时请提供引用来源。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=4096
            )
            content = response.choices[0].message.content
            return QueryResult(
                engine=self.engine_name,
                keyword=keyword,
                query=prompt,
                full_response=content,
                raw_metadata={
                    "model": self.model,
                    "usage": response.usage.model_dump() if response.usage else {},
                    "finish_reason": response.choices[0].finish_reason
                }
            )
        except Exception as e:
            return QueryResult(
                engine=self.engine_name, keyword=keyword,
                query=prompt, full_response="", error=str(e)
            )

5.3 Perplexity采集器

Perplexity以其强大的引用能力著称,是目前GEO监控中最重要的数据源之一。Perplexity的API兼容OpenAI格式,但返回结构中包含citations字段直接给出引用URL列表:

# collectors/perplexity_collector.py
import openai
from collector_base import AICollectorBase, QueryResult

class PerplexityCollector(AICollectorBase):

    def __init__(self, api_key: str, model: str = "sonar-pro"):
        super().__init__("Perplexity", api_key)
        self.model = model
        self.client = openai.OpenAI(
            api_key=api_key,
            base_url="https://api.perplexity.ai"
        )

    def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
        prompt = prompt_template or self.build_prompt(keyword)
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=0.2,
                max_tokens=4096
            )
            content = response.choices[0].message.content
            raw_citations = getattr(response, 'citations', [])
            return QueryResult(
                engine=self.engine_name,
                keyword=keyword,
                query=prompt,
                full_response=content,
                citations=raw_citations,
                raw_metadata={"model": self.model, "citation_count": len(raw_citations)}
            )
        except Exception as e:
            return QueryResult(
                engine=self.engine_name, keyword=keyword,
                query=prompt, full_response="", error=str(e)
            )

5.4 DeepSeek采集器

DeepSeek近年来在中文AI搜索引擎领域表现突出,其API同样兼容OpenAI格式:

# collectors/deepseek_collector.py
import openai
from collector_base import AICollectorBase, QueryResult

class DeepSeekCollector(AICollectorBase):

    def __init__(self, api_key: str, model: str = "deepseek-chat"):
        super().__init__("DeepSeek", api_key)
        self.model = model
        self.client = openai.OpenAI(
            api_key=api_key,
            base_url="https://api.deepseek.com"
        )

    def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
        prompt = prompt_template or self.build_prompt(keyword)
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=0.3,
                max_tokens=4096
            )
            return QueryResult(
                engine=self.engine_name,
                keyword=keyword,
                query=prompt,
                full_response=response.choices[0].message.content,
                raw_metadata={"model": self.model}
            )
        except Exception as e:
            return QueryResult(
                engine=self.engine_name, keyword=keyword,
                query=prompt, full_response="", error=str(e)
            )

5.5 通义千问采集器

阿里云的通义千问通过DashScope SDK调用。注意通义千问提供了搜索增强模式,对GEO监控尤为重要:

# collectors/qwen_collector.py
import dashscope
from dashscope import Generation
from collector_base import AICollectorBase, QueryResult

class QwenCollector(AICollectorBase):

    def __init__(self, api_key: str, model: str = "qwen-max"):
        super().__init__("通义千问", api_key)
        self.model = model
        dashscope.api_key = api_key

    def query(self, keyword: str, prompt_template: str = None) -> QueryResult:
        prompt = prompt_template or self.build_prompt(keyword)
        try:
            response = Generation.call(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                result_format='message',
                enable_search=True,
                search_options={
                    "enable_citation": True,
                    "enable_source": True
                }
            )
            if response.status_code == 200:
                content = response.output.choices[0].message.content
                search_info = getattr(response.output, 'search_info', {}) or {}
                citations = search_info.get('search_results', [])
                return QueryResult(
                    engine=self.engine_name,
                    keyword=keyword,
                    query=prompt,
                    full_response=content,
                    citations=citations,
                    raw_metadata={"model": self.model, "search_enabled": True}
                )
            else:
                return QueryResult(
                    engine=self.engine_name, keyword=keyword,
                    query=prompt, full_response="",
                    error=f"API返回错误: {response.message}"
                )
        except Exception as e:
            return QueryResult(
                engine=self.engine_name, keyword=keyword,
                query=prompt, full_response="", error=str(e)
            )

5.6 统一调度器

有了各个采集器的实现,我们可以编写一个调度器来批量运行所有引擎的采集任务。调度器使用线程池并发执行多个引擎的查询请求,并加入随机延迟以避免触发API的频率限制:

# scheduler.py
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from collectors.chatgpt_collector import ChatGPTCollector
from collectors.perplexity_collector import PerplexityCollector
from collectors.deepseek_collector import DeepSeekCollector
from collectors.qwen_collector import QwenCollector

class CollectorScheduler:

    def __init__(self, config: dict):
        self.config = config
        self.collectors = self._init_collectors()

    def _init_collectors(self) -> list:
        collectors = []
        if self.config.get('openai_key'):
            collectors.append(ChatGPTCollector(self.config['openai_key']))
        if self.config.get('perplexity_key'):
            collectors.append(PerplexityCollector(self.config['perplexity_key']))
        if self.config.get('deepseek_key'):
            collectors.append(DeepSeekCollector(self.config['deepseek_key']))
        if self.config.get('dashscope_key'):
            collectors.append(QwenCollector(self.config['dashscope_key']))
        return collectors

    def run_batch(self, keywords: list, max_workers: int = 3) -> list:
        all_results = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for collector in self.collectors:
                for kw in keywords:
                    futures.append(executor.submit(
                        self._safe_collect, collector, kw
                    ))
            for future in as_completed(futures):
                result = future.result()
                if result:
                    all_results.append(result)
                time.sleep(random.uniform(0.5, 2.0))
        return all_results

    def _safe_collect(self, collector, keyword: str):
        try:
            return collector.query(keyword)
        except Exception as e:
            print(f"[ERROR] {collector.engine_name} 查询 '{keyword}' 失败: {e}")
            return None

六、引用判定与解析

6.1 引用判定策略

引用判定是整个系统中最考验工程能力的环节。AI的回答往往是自然语言,它可能直接提到你的品牌,也可能以变体、缩写或近似表达的方式提及。我们采用三阶段判定策略

  • 第一阶段——精确匹配:直接用正则表达式匹配品牌名称、产品名称、域名等硬标识
  • 第二阶段——语义匹配:使用文本嵌入模型计算回答文本与品牌描述的余弦相似度,推荐使用OpenAI的text-embedding-3-small模型
  • 第三阶段——人工抽检:对前两阶段结果不一致的样本进行人工复核,并持续优化判定规则

6.2 引用判定核心代码

# citation_detector.py
import re
import numpy as np
from typing import List

class CitationDetector:

    def __init__(self, brand_patterns: List[str],
                 brand_aliases: List[str] = None,
                 domains: List[str] = None):
        self.brand_patterns = brand_patterns
        self.brand_aliases = brand_aliases or []
        self.domains = domains or []
        all_terms = brand_patterns + self.brand_aliases
        self.pattern = re.compile(
            '|'.join(re.escape(t) for t in all_terms),
            re.IGNORECASE
        )
        if self.domains:
            domain_pattern = '|'.join(re.escape(d) for d in self.domains)
            self.domain_pattern = re.compile(
                rf'(https?://)?(?:www\.)?({domain_pattern})\\b',
                re.IGNORECASE
            )
        else:
            self.domain_pattern = None

    def detect_exact_match(self, text: str) -> List[dict]:
        matches = []
        for m in self.pattern.finditer(text):
            matches.append({
                'type': 'exact',
                'matched_text': m.group(),
                'position': m.start(),
                'context': self._get_context(text, m.start(), m.end())
            })
        return matches

    def detect_domain_match(self, text: str) -> List[dict]:
        if not self.domain_pattern:
            return []
        matches = []
        for m in self.domain_pattern.finditer(text):
            matches.append({
                'type': 'domain',
                'matched_text': m.group(),
                'position': m.start(),
                'context': self._get_context(text, m.start(), m.end())
            })
        return matches

    def compute_semantic_score(self, text: str, brand_embedding,
                               text_embedding) -> float:
        if text_embedding is None or brand_embedding is None:
            return 0.0
        dot_product = np.dot(brand_embedding, text_embedding)
        norm_a = np.linalg.norm(brand_embedding)
        norm_b = np.linalg.norm(text_embedding)
        if norm_a == 0 or norm_b == 0:
            return 0.0
        return float(dot_product / (norm_a * norm_b))

    def analyze_citation(self, response_text: str,
                         brand_embedding=None,
                         text_embedding=None,
                         semantic_threshold: float = 0.75) -> dict:
        result = {
            'is_cited': False,
            'exact_matches': [],
            'domain_matches': [],
            'semantic_score': 0.0,
            'citation_rank': None,
            'citation_context': None
        }
        exact_matches = self.detect_exact_match(response_text)
        result['exact_matches'] = exact_matches
        domain_matches = self.detect_domain_match(response_text)
        result['domain_matches'] = domain_matches
        if brand_embedding is not None and text_embedding is not None:
            score = self.compute_semantic_score(
                response_text, brand_embedding, text_embedding
            )
            result['semantic_score'] = score
            if score >= semantic_threshold:
                result['is_cited'] = True
        if exact_matches or domain_matches:
            result['is_cited'] = True
            first_match = (exact_matches + domain_matches)[0]
            result['citation_rank'] = first_match['position']
            result['citation_context'] = first_match['context']
        return result

    def _get_context(self, text: str, start: int, end: int,
                     window: int = 100) -> str:
        ctx_start = max(0, start - window)
        ctx_end = min(len(text), end + window)
        prefix = "..." if ctx_start > 0 else ""
        suffix = "..." if ctx_end < len(text) else ""
        return prefix + text[ctx_start:ctx_end] + suffix

6.3 引用位次与情感分析

在AI回答中,引用位次(你的品牌是第几个被提及的)是一个关键指标。被第一个提及意味着AI对你的品牌有最高权重。此外,我们还需要对引用进行情感分析,判断提及的情感倾向是正面、中性还是负面。以下是基于HuggingFace的中文情感分析实现:

# sentiment_analyzer.py
from transformers import pipeline

class SentimentAnalyzer:

    def __init__(self):
        self.classifier = pipeline(
            "sentiment-analysis",
            model="uer/roberta-base-finetuned-jd-binary-chinese"
        )

    def analyze(self, context_text: str) -> dict:
        if not context_text or len(context_text.strip()) < 5:
            return {"label": "NEUTRAL", "score": 0.5}
        try:
            result = self.classifier(context_text[:512])
            return result[0]
        except Exception as e:
            print(f"情感分析失败: {e}")
            return {"label": "UNKNOWN", "score": 0.0}

七、数据库设计与存储

7.1 完整数据库表结构

我们使用PostgreSQL作为主数据库,并利用SQLAlchemy ORM进行对象关系映射。完整的表结构设计包括三张核心表:查询日志主表、引用详情表和每日汇总统计表。

-- 查询日志主表
CREATE TABLE query_logs (
    id SERIAL PRIMARY KEY,
    engine VARCHAR(50) NOT NULL,
    keyword_id INT REFERENCES keywords(id),
    keyword TEXT NOT NULL,
    query_text TEXT NOT NULL,
    response_text TEXT NOT NULL,
    response_length INT,
    is_cited BOOLEAN DEFAULT FALSE,
    citation_count INT DEFAULT 0,
    citation_rank INT,
    semantic_score FLOAT,
    sentiment_label VARCHAR(20),
    sentiment_score FLOAT,
    raw_response_json JSONB,
    error_message TEXT,
    query_timestamp TIMESTAMP DEFAULT NOW(),
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_query_logs_engine ON query_logs(engine);
CREATE INDEX idx_query_logs_is_cited ON query_logs(is_cited);
CREATE INDEX idx_query_logs_timestamp ON query_logs(query_timestamp);
CREATE INDEX idx_query_logs_engine_cited ON query_logs(engine, is_cited);

-- 引用详情表
CREATE TABLE citation_details (
    id SERIAL PRIMARY KEY,
    query_log_id INT REFERENCES query_logs(id),
    match_type VARCHAR(20) NOT NULL,
    matched_text TEXT NOT NULL,
    match_position INT,
    context_before TEXT,
    context_after TEXT,
    cited_url TEXT,
    sentiment_label VARCHAR(20),
    sentiment_score FLOAT,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 每日汇总统计表
CREATE TABLE daily_stats (
    id SERIAL PRIMARY KEY,
    stat_date DATE NOT NULL,
    engine VARCHAR(50) NOT NULL,
    keyword_id INT REFERENCES keywords(id),
    total_queries INT DEFAULT 0,
    cited_queries INT DEFAULT 0,
    citation_rate FLOAT DEFAULT 0.0,
    avg_citation_rank FLOAT,
    avg_semantic_score FLOAT,
    positive_count INT DEFAULT 0,
    negative_count INT DEFAULT 0,
    neutral_count INT DEFAULT 0,
    created_at TIMESTAMP DEFAULT NOW(),
    UNIQUE(stat_date, engine, keyword_id)
);

7.2 SQLAlchemy数据访问层

使用SQLAlchemy可以方便地操作数据库。下面是数据模型定义和数据库管理器实现:

# database.py
from sqlalchemy import create_engine, Column, Integer, String, Float, Boolean, Text, Date, DateTime, ForeignKey, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime

Base = declarative_base()

class Keyword(Base):
    __tablename__ = 'keywords'
    id = Column(Integer, primary_key=True)
    keyword = Column(String(500), nullable=False)
    category = Column(String(50), nullable=False)
    target_industry = Column(String(100))
    priority = Column(Integer, default=3)
    status = Column(String(20), default='active')
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

class QueryLog(Base):
    __tablename__ = 'query_logs'
    id = Column(Integer, primary_key=True)
    engine = Column(String(50), nullable=False)
    keyword_id = Column(Integer, ForeignKey('keywords.id'))
    keyword = Column(String(500), nullable=False)
    query_text = Column(Text)
    response_text = Column(Text)
    response_length = Column(Integer)
    is_cited = Column(Boolean, default=False)
    citation_count = Column(Integer, default=0)
    citation_rank = Column(Integer)
    semantic_score = Column(Float)
    sentiment_label = Column(String(20))
    sentiment_score = Column(Float)
    raw_response_json = Column(JSON)
    error_message = Column(Text)
    query_timestamp = Column(DateTime, default=datetime.now)
    created_at = Column(DateTime, default=datetime.now)
    citations = relationship("CitationDetail", back_populates="query_log")

class CitationDetail(Base):
    __tablename__ = 'citation_details'
    id = Column(Integer, primary_key=True)
    query_log_id = Column(Integer, ForeignKey('query_logs.id'))
    match_type = Column(String(20), nullable=False)
    matched_text = Column(Text, nullable=False)
    match_position = Column(Integer)
    context_before = Column(Text)
    context_after = Column(Text)
    cited_url = Column(Text)
    sentiment_label = Column(String(20))
    sentiment_score = Column(Float)
    created_at = Column(DateTime, default=datetime.now)
    query_log = relationship("QueryLog", back_populates="citations")

class DatabaseManager:

    def __init__(self, db_url: str):
        self.engine = create_engine(db_url)
        self.Session = sessionmaker(bind=self.engine)

    def init_tables(self):
        Base.metadata.create_all(self.engine)

    def save_query_result(self, result, keyword_id: int):
        session = self.Session()
        try:
            log = QueryLog(
                engine=result.engine,
                keyword_id=keyword_id,
                keyword=result.keyword,
                query_text=result.query,
                response_text=result.full_response,
                response_length=len(result.full_response) if result.full_response else 0,
                raw_response_json=result.raw_metadata,
                error_message=result.error,
                query_timestamp=result.timestamp
            )
            session.add(log)
            session.commit()
            return log.id
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()

    def update_citation_result(self, log_id: int, citation_result: dict):
        session = self.Session()
        try:
            log = session.query(QueryLog).filter_by(id=log_id).first()
            if log:
                log.is_cited = citation_result['is_cited']
                log.citation_count = len(
                    citation_result['exact_matches'] +
                    citation_result['domain_matches']
                )
                log.citation_rank = citation_result['citation_rank']
                log.semantic_score = citation_result['semantic_score']
                for match in citation_result['exact_matches']:
                    detail = CitationDetail(
                        query_log_id=log_id,
                        match_type=match['type'],
                        matched_text=match['matched_text'],
                        match_position=match['position'],
                        context_after=match['context']
                    )
                    session.add(detail)
                session.commit()
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()

八、可视化看板

数据采集和分析之后,我们需要一个直观的看板来展示结果。使用Streamlit和Plotly可以快速搭建交互式仪表盘。看板包含KPI指标卡片、引用率趋势图和引用位次分布图:

# dashboard.py
import streamlit as st
import pandas as pd
import plotly.express as px
import os
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta

st.set_page_config(page_title="AI引用率监控看板", layout="wide")

@st.cache_resource
def get_db_connection():
    db_url = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
    return create_engine(db_url)

engine = get_db_connection()

st.title("AI引用率监控与诊断看板")
st.markdown("实时监控品牌在各大AI搜索引擎中的引用情况")

st.sidebar.header("筛选条件")
date_range = st.sidebar.date_input(
    "日期范围",
    [datetime.now() - timedelta(days=30), datetime.now()]
)
selected_engines = st.sidebar.multiselect(
    "选择AI引擎",
    ["ChatGPT", "Perplexity", "DeepSeek", "通义千问", "Kimi", "豆包", "Claude"],
    default=["ChatGPT", "Perplexity", "DeepSeek"]
)

col1, col2, col3, col4 = st.columns(4)

sql_metrics = '''
    SELECT
        COUNT(*) as total_queries,
        SUM(CASE WHEN is_cited THEN 1 ELSE 0 END) as cited_queries,
        AVG(semantic_score) as avg_semantic_score,
        AVG(CASE WHEN is_cited THEN citation_rank END) as avg_rank
    FROM query_logs
    WHERE query_timestamp BETWEEN :start_date AND :end_date
        AND engine = ANY(:engines)
'''
query = text(sql_metrics)

df_metrics = pd.read_sql(query, engine, params={
    'start_date': date_range[0],
    'end_date': date_range[1],
    'engines': selected_engines
})

total = df_metrics['total_queries'].iloc[0] or 0
cited = df_metrics['cited_queries'].iloc[0] or 0
rate = (cited / total * 100) if total > 0 else 0

with col1:
    st.metric("总查询次数", f"{total:,}")
with col2:
    st.metric("被引用次数", f"{cited:,}")
with col3:
    st.metric("引用率", f"{rate:.2f}%")
with col4:
    avg_rank = df_metrics['avg_rank'].iloc[0]
    st.metric("平均引用位次", f"{avg_rank:.1f}" if avg_rank else "N/A")

st.subheader("引用率趋势(按引擎)")
sql_trend = '''
    SELECT
        DATE(query_timestamp) as date,
        engine,
        COUNT(*) as total,
        SUM(CASE WHEN is_cited THEN 1 ELSE 0 END) as cited
    FROM query_logs
    WHERE query_timestamp BETWEEN :start_date AND :end_date
        AND engine = ANY(:engines)
    GROUP BY DATE(query_timestamp), engine
    ORDER BY date
'''
trend_query = text(sql_trend)

df_trend = pd.read_sql(trend_query, engine, params={
    'start_date': date_range[0],
    'end_date': date_range[1],
    'engines': selected_engines
})
df_trend['citation_rate'] = (df_trend['cited'] / df_trend['total'] * 100).fillna(0)

fig_trend = px.line(
    df_trend, x='date', y='citation_rate', color='engine',
    title='各引擎引用率趋势',
    labels={'citation_rate': '引用率 (%)', 'date': '日期', 'engine': '引擎'}
)
fig_trend.update_layout(height=400)
st.plotly_chart(fig_trend, use_container_width=True)

st.subheader("引用位次分布")
sql_rank = '''
    SELECT engine, citation_rank, COUNT(*) as count
    FROM query_logs
    WHERE is_cited = TRUE
        AND citation_rank IS NOT NULL
        AND query_timestamp BETWEEN :start_date AND :end_date
        AND engine = ANY(:engines)
    GROUP BY engine, citation_rank
    ORDER BY engine, citation_rank
'''
rank_query = text(sql_rank)
df_rank = pd.read_sql(rank_query, engine, params={
    'start_date': date_range[0],
    'end_date': date_range[1],
    'engines': selected_engines
})

fig_rank = px.bar(
    df_rank, x='engine', y='count', color='citation_rank',
    barmode='group', title='各引擎引用位次分布',
    labels={'count': '数量', 'engine': '引擎', 'citation_rank': '引用位次'}
)
st.plotly_chart(fig_rank, use_container_width=True)

运行看板只需要在终端执行:

streamlit run dashboard.py

然后浏览器打开 http://localhost:8501 即可查看交互式数据看板。

九、异常告警系统

当AI引用率出现异常波动(如大幅下降或飙升),我们需要第一时间知道。这里实现企业微信和飞书的Webhook告警,告警规则为当前引用率较前7天均值下降超过20个百分点即触发告警:

# alerting.py
import requests
from datetime import datetime
from sqlalchemy import create_engine, text

class AlertManager:

    def __init__(self, db_url: str, webhook_configs: dict):
        self.engine = create_engine(db_url)
        self.wework_webhook = webhook_configs.get('wework')
        self.feishu_webhook = webhook_configs.get('feishu')

    def check_citation_rate(self, engine_name: str,
                            threshold: float = 0.2,
                            lookback_days: int = 7) -> dict:
        sql_check = """
            WITH current_rate AS (
                SELECT
                    SUM(CASE WHEN is_cited THEN 1 ELSE 0 END)::float
                    / NULLIF(COUNT(*), 0) as rate
                FROM query_logs
                WHERE engine = :engine
                    AND query_timestamp >= NOW() - INTERVAL '1 day'
            ),
            historical_rate AS (
                SELECT
                    SUM(CASE WHEN is_cited THEN 1 ELSE 0 END)::float
                    / NULLIF(COUNT(*), 0) as rate
                FROM query_logs
                WHERE engine = :engine
                    AND query_timestamp BETWEEN
                        NOW() - INTERVAL ':days days'
                        AND NOW() - INTERVAL '1 day'
            )
            SELECT c.rate as current_rate,
                   h.rate as historical_rate,
                   CASE WHEN h.rate > 0 THEN
                       (c.rate - h.rate) / h.rate ELSE NULL
                   END as change_pct
            FROM current_rate c, historical_rate h
        """
        query = text(sql_check)
        with self.engine.connect() as conn:
            result = conn.execute(query, {
                'engine': engine_name,
                'days': lookback_days
            }).fetchone()
        if result and result.current_rate is not None:
            alert_info = {
                'engine': engine_name,
                'current_rate': round(result.current_rate * 100, 2),
                'historical_rate': round((result.historical_rate or 0) * 100, 2),
                'change_pct': round((result.change_pct or 0) * 100, 2),
                'should_alert': False
            }
            if result.change_pct is not None and result.change_pct < -threshold:
                alert_info['should_alert'] = True
                alert_info['severity'] = 'warning'
            elif result.change_pct is not None and result.change_pct < -threshold * 2:
                alert_info['should_alert'] = True
                alert_info['severity'] = 'critical'
            return alert_info
        return {'engine': engine_name, 'should_alert': False, 'error': 'No data'}

    def send_wework_alert(self, alert_info: dict):
        if not self.wework_webhook or not alert_info.get('should_alert'):
            return
        content = f"## AI引用率异常告警\\n> 引擎: {alert_info['engine']}\\n> 当前引用率: {alert_info['current_rate']}%\\n> 历史均值: {alert_info['historical_rate']}%\\n> 变化幅度: {alert_info['change_pct']}%\\n> 告警时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\\n请登录AI引用率监控看板查看详情。"
        payload = {"msgtype": "markdown", "markdown": {"content": content}}
        try:
            resp = requests.post(self.wework_webhook, json=payload, timeout=10)
            if resp.status_code == 200:
                print(f"[ALERT] 企业微信告警已发送: {alert_info['engine']}")
            else:
                print(f"[ERROR] 企业微信告警发送失败: {resp.text}")
        except Exception as e:
            print(f"[ERROR] 企业微信告警异常: {e}")

    def send_feishu_alert(self, alert_info: dict):
        if not self.feishu_webhook or not alert_info.get('should_alert'):
            return
        card_content = f"**引擎**: {alert_info['engine']}\\n**当前引用率**: {alert_info['current_rate']}%\\n**历史均值**: {alert_info['historical_rate']}%\\n**变化幅度**: {alert_info['change_pct']}%\\n**告警时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        payload = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": {"tag": "plain_text", "content": "AI引用率异常告警"},
                    "template": "red" if alert_info.get('severity') == 'critical' else "orange"
                },
                "elements": [{
                    "tag": "div",
                    "text": {"tag": "lark_md", "content": card_content}
                }]
            }
        }
        try:
            resp = requests.post(self.feishu_webhook, json=payload, timeout=10)
            if resp.status_code == 200:
                print(f"[ALERT] 飞书告警已发送: {alert_info['engine']}")
            else:
                print(f"[ERROR] 飞书告警发送失败: {resp.text}")
        except Exception as e:
            print(f"[ERROR] 飞书告警异常: {e}")

    def run_daily_check(self):
        engines = ["ChatGPT", "Perplexity", "DeepSeek", "通义千问", "Kimi", "豆包"]
        for engine in engines:
            alert_info = self.check_citation_rate(engine)
            if alert_info.get('should_alert'):
                self.send_wework_alert(alert_info)
                self.send_feishu_alert(alert_info)
        print(f"[INFO] 每日告警检查完成 - {datetime.now()}")

十、定时任务与自动化

为了让系统持续自动运行,我们需要配置定时任务。使用Python的schedule库来实现每日自动采集和告警。采集任务每4小时执行一次,告警任务每天早上9点执行:

# cron_runner.py
import schedule
import time
import yaml
from dotenv import load_dotenv
import os
from scheduler import CollectorScheduler
from citation_detector import CitationDetector
from database import DatabaseManager
from alerting import AlertManager

load_dotenv()

with open('config.yaml', 'r', encoding='utf-8') as f:
    config = yaml.safe_load(f)

db_manager = DatabaseManager(os.getenv('DATABASE_URL'))
collector_scheduler = CollectorScheduler({
    'openai_key': os.getenv('OPENAI_API_KEY'),
    'perplexity_key': os.getenv('PERPLEXITY_API_KEY'),
    'deepseek_key': os.getenv('DEEPSEEK_API_KEY'),
    'dashscope_key': os.getenv('DASHSCOPE_API_KEY'),
})
detector = CitationDetector(
    brand_patterns=config['brand_patterns'],
    domains=config['domains']
)
alert_manager = AlertManager(
    db_url=os.getenv('DATABASE_URL'),
    webhook_configs={
        'wework': os.getenv('WEWORK_WEBHOOK'),
        'feishu': os.getenv('FEISHU_WEBHOOK')
    }
)

def run_collection_job():
    print(f"[JOB] 开始采集任务 - {time.strftime('%Y-%m-%d %H:%M:%S')}")
    keywords = config.get('keywords', [])
    results = collector_scheduler.run_batch(keywords[:20])
    for result in results:
        log_id = db_manager.save_query_result(result, keyword_id=0)
        citation = detector.analyze_citation(result.full_response)
        db_manager.update_citation_result(log_id, citation)
    print(f"[JOB] 采集任务完成 - 共处理 {len(results)} 条结果")

def run_alert_job():
    print(f"[JOB] 开始告警检查 - {time.strftime('%Y-%m-%d %H:%M:%S')}")
    alert_manager.run_daily_check()

schedule.every(4).hours.do(run_collection_job)
schedule.every().day.at("09:00").do(run_alert_job)

run_collection_job()

if __name__ == "__main__":
    print("[SYSTEM] AI引用率监控系统已启动")
    print("[SYSTEM] 定时任务已配置:采集每4小时 | 告警每日09:00")
    while True:
        schedule.run_pending()
        time.sleep(60)

十一、实战中的踩坑与优化建议

11.1 API调用成本控制

各大AI引擎的API调用都有费用,如果不加控制,每月费用可能高达数千元。以下是几个节省成本的策略:

  • 优先级策略:高优先级关键词(品牌词、核心产品词)每天采集1次,中等优先级每3天1次,低优先级每周1次。通过控制关键词级别可以灵活调节成本
  • 采样策略:同类关键词(如10个竞品名称)只随机采样3个,可以降低百分之三十的调用量并且不显著影响数据趋势
  • 缓存复用:同一关键词在24小时内的重复查询直接使用缓存结果,避免重复调用同一API
  • 模型选择:非核心引擎使用便宜模型,如gpt-3.5-turbo代替gpt-4o,可以减少一半以上的API费用

11.2 引用误判问题

在引用判定中,最常见的坑是过度匹配和漏匹配。举例来说品牌名云帆可能出现在千帆云帆这种无关上下文中;而YunFan(拼音)或缩写YF则可能漏掉。建议采取以下措施:

  • 维护一个黑名单词表,排除常见误匹配模式
  • 定期(每周)对边缘样本进行人工抽检,持续优化正则规则
  • 引入NER(命名实体识别)模型辅助判定,降低误判率
  • 使用上下文窗口判断匹配是否与品牌语义相关

11.3 API限流与重试

几乎所有AI API都有速率限制(Rate Limit)。遇到429错误时需要优雅地处理:

  • 实现指数退避重试(Exponential Backoff):第1次等1秒,第2次等2秒,第3次等4秒,最多重试5次
  • 使用令牌桶算法控制整体调用速率
  • 对频繁触发限流的引擎自动降级,降低采集频率

11.4 数据量与存储

假设监控200个关键词、覆盖8个引擎、每天采集1次,每个回答平均2000字符,则每天产生约3.2MB文本数据,每月约96MB。如果需要保留一年数据,建议采取以下措施:

  • 对response_text字段启用PostgreSQL的TOAST压缩
  • 超过3个月的数据自动归档到S3或对象存储
  • 汇总数据保留在数据库中,明细数据只保留近3个月
  • 在query_timestamp上建立分区表,按月份分区,提升查询效率

十二、GEO优化的行动建议

当你通过这套系统获得了AI引用率数据后,下一步就是行动。以下是基于监控数据的GEO优化建议:

  • 发现高引用内容特征:分析哪些页面或内容被AI高频引用,总结其共性(字数、结构、格式、权威度信号等),将这些特征应用到其他页面
  • 补齐零引用缺口:找出对你所在行业高频提问但你的品牌完全未被提及的关键词,针对性地创建专题内容
  • 优化引用位次:如果你的品牌总是出现在AI回答的后半部分,说明你在该主题上的权威性不足,需要加强外链建设、权威引用和结构化数据标记
  • 监控竞品引用:将竞品名称也加入关键词库,对比你和竞品的引用率差异,找出竞品的GEO优势来源
  • 情感干预:如果AI对你的引用带有负面情感,需要分析原因并发布正面内容进行对冲
  • 结构化数据标记:在网站上添加Schema.org结构化数据,帮助AI引擎更好地理解和引用你的内容
  • 内容新鲜度维护:定期更新被高引用的内容页面,保持内容的新鲜度和相关性,AI引擎通常更倾向于引用近期更新的内容

十三、常见问题解答

问:如果我没有所有AI引擎的API Key,系统还能运行吗?

答:完全可以。调度器会根据配置文件中是否存在对应的API Key来初始化采集器。你可以只配置已有权限的引擎,系统会自动跳过未配置的引擎。

问:API调用的每月成本大概是多少?

答:这取决于关键词数量和引擎覆盖率。以监控200个关键词、覆盖4个引擎为例,每天约800次API调用,按照主流大模型的价格,每月费用大约在200到500元人民币之间。通过优先级策略和采样策略,可以降低到100元以下。

问:引用判定的准确率能达到多少?

答:精确匹配的准确率接近百分之百,语义匹配在设置合理阈值的情况下准确率约85%到90%。综合来看,结合三阶段判定后,整体的召回率约95%,准确率约92%。建议在实际使用中每周人工抽检20到30条样本。

问:系统支持实时监控吗?

答:默认配置是定时批量采集(每4小时一次),不完全实时。如果需要接近实时的监控,可以将采集间隔缩短到30分钟。但需要注意API调用成本和频率限制。

问:如何添加新的AI引擎?

答:只需继承AICollectorBase基类,实现query方法,然后在CollectorScheduler的_init_collectors中添加实例即可。整个过程大约需要10到20行代码。

十四、总结与展望

本文详细讲解了如何用Python从零构建一套AI引用率监控与诊断系统,覆盖了关键词库构建、多源数据采集(ChatGPT、Perplexity、DeepSeek、通义千问等八大引擎)、引用判定(精确匹配加上语义匹配加上情感分析)、数据库存储、Streamlit可视化看板和异常告警(企业微信加上飞书)等全流程。

这套系统的核心价值在于:让品牌在AI时代的搜索结果中获得可见性和可控性。在传统的SEO中,你可以通过Google Search Console和百度站长平台查看搜索表现;但在AI搜索中,此前几乎没有任何免费的开源工具可以做到这一点。本文的完整代码合计超过五百行,可以直接作为项目的启动框架,你可以根据自己的需求进行二次开发和定制。

展望未来,AI引用率监控领域还有以下方向值得探索:

  • 多模态引用检测:随着AI支持图片和视频搜索,需要扩展到图片水印检测和视频内容识别
  • 实时流式监控:使用WebSocket实时监听AI回答流,第一时间发现引用变化
  • A/B测试平台:系统化地测试不同内容策略对AI引用率的影响,找到最优GEO方案
  • 预测模型:基于历史数据训练模型,预测哪些内容策略最可能提升AI引用率
  • 多语言覆盖:将监控范围扩展到英文、日文、韩文等全球主流语言,支持出海品牌的GEO需求
  • 引用归因分析:追踪AI引用你的内容后,用户的后续行为(点击率、转化率)以量化GEO的商业价值

AI搜索时代已经到来,GEO将成为每一家关注线上增长的企业不可或缺的核心能力。当用户不再逐个点击搜索结果,而是直接获取AI总结的答案时,你的品牌能否出现在那个答案中,将成为决定线上可见性的关键。希望本文能成为你踏入GEO领域的第一步,也欢迎在实践中与你交流更多经验与反馈。

附录:完整项目文件结构与部署指南

geo-citation-monitor/
├── .env
├── config.yaml
├── collector_base.py
├── collectors/
│   ├── __init__.py
│   ├── chatgpt_collector.py
│   ├── perplexity_collector.py
│   ├── deepseek_collector.py
│   ├── qwen_collector.py
│   ├── kimi_collector.py
│   └── doubao_collector.py
├── citation_detector.py
├── sentiment_analyzer.py
├── database.py
├── scheduler.py
├── alerting.py
├── dashboard.py
├── cron_runner.py
├── utils/
│   ├── __init__.py
│   ├── logger.py
│   └── config_loader.py
└── requirements.txt

执行以下命令部署系统:

# 1. 安装依赖
pip install -r requirements.txt

# 2. 配置环境变量
cp .env.example .env
# 编辑 .env,填入各平台的 API Key

# 3. 初始化数据库
python -c "from database import DatabaseManager; DatabaseManager('postgresql://user:pass@localhost/geo_monitor').init_tables()"

# 4. 启动监控系统(定时采集+告警)
python cron_runner.py

# 5. 启动可视化看板(另开终端)
streamlit run dashboard.py

参考资料:

  • OpenAI API文档: https://platform.openai.com/docs
  • Perplexity API: https://docs.perplexity.ai
  • DeepSeek API: https://platform.deepseek.com/docs
  • 阿里云灵积DashScope: https://help.aliyun.com/document_detail/2712195.html
  • Streamlit文档: https://docs.streamlit.io
  • Plotly Python文档: https://plotly.com/python/
  • SQLAlchemy文档: https://docs.sqlalchemy.org/
  • Related Posts

    • GEO前沿
    • 30 6 月, 2026
    • 1188 views
    • 20 minutes Read
    实操指南:用Python构建AI引用率监控与诊断系统(附完整代码)

    一、引言:为什么你需要关心AI引用率 2024年以来,以ChatGPT、Perplexity、Kim…

    • GEO前沿
    • 30 6 月, 2026
    • 409 views
    • 2 minutes Read
    教育培训机构AEO突围:被AI推荐为’家长首选’的9个月实操全记录

    一、当家长开始问AI:”附近哪家辅导班靠谱?” 2025年暑假前夕,北京海淀…

    发表回复

    您错过的内容

    超越流量思维:GEO为企业带来的五种新型战略资产——从品牌认知到AI推荐飞轮

    • 25 6 月, 2026
    • 1036 views

    GEO为企业创造的五大隐性价值:超越流量指标的深层ROI

    • 23 6 月, 2026
    • 440 views

    GEO为企业带来的六大核心价值与ROI评估模型

    • 21 6 月, 2026
    • 659 views

    GEO投入产出比(ROI)量化模型:5个真实企业案例数据深度解析【GEO价值】

    • 19 6 月, 2026
    • 943 views

    GEO为企业带来的核心价值:为什么2026年每个品牌都需要生成引擎优化

    • 2 6 月, 2026
    • 910 views
    GEO为企业带来的核心价值:为什么2026年每个品牌都需要生成引擎优化

    为什么200万+企业选择互联在线GEO解决方案

    • 25 5 月, 2026
    • 1270 views
    为什么200万+企业选择互联在线GEO解决方案