本文作者:咔咔

如何实时高效抓取并安全存储海量股票行情数据?

咔咔 2025-12-18 1 抢沙发
如何实时高效抓取并安全存储海量股票行情数据?摘要: 核心概念在开始之前,我们需要理解几个核心概念:行情数据:主要包括两种类型快照数据:每 N 秒(如 5 秒、10 秒)推送一次的当前市场状态,包含股票的最新价、最高价、最低价、成交量...

核心概念

在开始之前,我们需要理解几个核心概念:

  1. 行情数据:主要包括两种类型

    如何实时高效抓取并安全存储海量股票行情数据?
    (图片来源网络,侵删)
    • 快照数据:每 N 秒(如 5 秒、10 秒)推送一次的当前市场状态,包含股票的最新价、最高价、最低价、成交量、涨跌幅等,这是最常见的行情数据。
    • 逐笔成交数据:每一笔成交的详细信息,包括成交时间、成交价、成交量,对于高频交易和深度分析至关重要。
  2. 数据源:从哪里获取数据?

    • 免费源:新浪财经、腾讯财经、网易财经、东方财富等,数据有延迟(15 分钟),适合学习和非实时性要求不高的场景。
    • 付费/专业源
      • 交易所直连:最权威、最实时,但成本极高,门槛也高,通常是机构才能接入。
      • 金融数据服务商:如 Wind(万得)、同花顺 iFinD、东方财富 Choice 等,提供高质量、低延迟的数据,但有昂贵的订阅费。
      • API 接口服务商:如 Tushare、AKShare、Alpha Vantage、Polygon.io 等,它们聚合了多种数据源,提供 API 接口,成本相对较低,是个人开发者和小型团队的首选。
  3. 存储方案:数据存到哪里?

    • 关系型数据库:如 MySQL, PostgreSQL,适合存储结构化的快照数据,可以方便地进行查询和关联,但对于高频逐笔数据,写入性能可能成为瓶颈。
    • 时序数据库:如 InfluxDB, TimescaleDB (基于 PostgreSQL), Prometheus,这是存储时间序列数据的最佳选择,它针对时间戳数据进行了高度优化,写入和查询性能极高,压缩率高,非常适合存储股票行情这种天然带时间戳的数据。
    • 文件存储:如 Parquet, CSV 文件,适合做数据归档、离线分析或大数据处理(如使用 Spark, Hive)。

使用免费 API + 本地存储 (入门级)

这个方案成本低,适合学习、个人研究或搭建简单的监控应用。

步骤 1:选择数据源和库

  • 数据源:新浪财经,它的数据接口虽然非官方,但被广泛使用,相对稳定。
  • Python 库
    • requests: 用于发送 HTTP 请求获取数据。
    • pandas: 用于处理和结构化数据。
    • influxdb-client: 用于将数据写入 InfluxDB 时序数据库。
    • scheduleAPScheduler: 用于定时抓取数据。

步骤 2:获取实时行情数据

新浪财经提供了一个获取 A 股实时行情的接口,获取所有 A 股的行情快照:

如何实时高效抓取并安全存储海量股票行情数据?
(图片来源网络,侵删)
import requests
import pandas as pd
import time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 配置 ---
# 新浪财经行情接口
SINA_URL = "http://hq.sinajs.cn/list="
# InfluxDB 配置
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "your_token" # 替换为你的 token
INFLUXDB_ORG = "your_org"   # 替换为你的组织
INFLUXDB_BUCKET = "stock_market" # 替换为你的 bucket
# 获取所有 A 股代码 (这里简化处理,实际可以从文件或数据库获取)
# 获取 'sh600000' 到 'sh604999' 和 'sz000001' 到 'sz004999'
stock_codes = [f"sh{i:06d}" for i in range(600000, 605000)] + [f"sz{i:06d}" for i in range(1, 5000)]
def fetch_sina_data(codes):
    """从新浪财经抓取指定股票的行情数据"""
    all_codes_str = ",".join(codes)
    response = requests.get(SINA_URL + all_codes_str)
    data_str = response.text.split(";")
    stock_data_list = []
    for i, code in enumerate(codes):
        if i < len(data_str) - 1:
            # 解析返回的字符串,格式为 "var hq_str_s_sh600000="..."
            content = data_str[i].split('="')[1].replace('"', '')
            fields = content.split(',')
            # 构造字典
            stock_info = {
                'code': code,
                'name': fields[0],
                'open': float(fields[1]),
                'close': float(fields[2]),
                'current_price': float(fields[3]),
                'high': float(fields[4]),
                'low': float(fields[5]),
                'volume': float(fields[8]),
                'timestamp': pd.to_datetime(fields[30] + ' ' + fields[31]) # 日期+时间
            }
            stock_data_list.append(stock_info)
    return stock_data_list
# --- 主循环 ---
# 初始化 InfluxDB 客户端
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
try:
    while True:
        print(f"正在抓取数据... {time.ctime()}")
        data_list = fetch_sina_data(stock_codes)
        # 将数据写入 InfluxDB
        for stock_data in data_list:
            point = Point("stock_snapshot") # 测量名称
            point.tag("stock_code", stock_data['code']) # 标签,用于快速过滤
            point.tag("stock_name", stock_data['name'])
            point.field("open", stock_data['open'])
            point.field("close", stock_data['close'])
            point.field("current_price", stock_data['current_price'])
            point.field("high", stock_data['high'])
            point.field("low", stock_data['low'])
            point.field("volume", stock_data['volume'])
            # 使用数据中的时间戳
            write_api.write(bucket=INFLUXDB_BUCKET, record=point, time=stock_data['timestamp'])
        print(f"成功写入 {len(data_list)} 条数据,等待 5 秒...")
        time.sleep(5) # 每 5 秒抓取一次
finally:
    write_api.close()
    client.close()

如何运行

  1. 安装依赖:pip install requests pandas influxdb-client
  2. 启动 InfluxDB 服务(可以使用 Docker 快速启动:docker run -p 8086:8086 influxdb)。
  3. 在 InfluxDB 中创建一个 bucket 和 token。
  4. 修改代码中的 INFLUXDB_ 配置。
  5. 运行脚本。

使用专业 API + 高性能存储 (生产级)

这个方案更专业,延迟更低,扩展性更强,适合构建交易系统或数据分析平台。

步骤 1:选择数据源和库

  • 数据源:Tushare Pro,它提供了高质量、低延迟(Level-2 行情)的金融数据,需要积分或付费。
  • Python 库
    • tushare: 官方 Python SDK。
    • pandas: 数据处理。
    • kafka: 用于构建消息队列,实现数据解耦和削峰填谷。
    • InfluxDB / ClickHouse: 高性能存储。

架构设计

一个更健壮的系统通常采用“生产者-消费者”架构:

[数据源] -> [API 抓取程序] -> [消息队列] -> [数据存储程序] -> [数据库]
      (Tushare)       (Producer)      (Kafka)       (Consumer)   (InfluxDB)

优势

  • 解耦:抓取逻辑和存储逻辑分离,互不影响。
  • 可靠性:如果存储程序宕机,数据会暂存在 Kafka 中,程序恢复后可以继续消费。
  • 扩展性:可以启动多个 Consumer 并行处理数据,提高写入吞吐量。

步骤 2:实现抓取与发送 (Producer)

import tushare as ts
import pandas as pd
import time
from kafka import KafkaProducer
import json
# --- 配置 ---
TUSHARE_TOKEN = "your_tushare_pro_token"
KAFKA_BROKERS = ['localhost:9092']
KAFKA_TOPIC = 'stock-realtime'
# 初始化 Tushare Pro
ts.set_token(TUSHARE_TOKEN)
pro = ts.pro_api()
# 初始化 Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def fetch_and_send():
    """获取实时行情并发送到 Kafka"""
    # 获取沪深A股实时行情
    df = pro.realtime_quotes(market='SHSE,SZSE') # SHSE上交所, SZSE深交所
    if df is not None and not df.empty:
        # 将 DataFrame 转换为字典列表
        records = df.to_dict(orient='records')
        for record in records:
            # 添加时间戳
            record['fetch_time'] = pd.Timestamp.now().isoformat()
            # 发送到 Kafka
            producer.send(KAFKA_TOPIC, value=record)
            print(f"已发送 {record['code']} 的数据到 Kafka")
if __name__ == '__main__':
    while True:
        try:
            fetch_and_send()
            print("等待 1 秒...")
            time.sleep(1) # Tushare Pro 的实时行情接口建议1秒调用一次
        except Exception as e:
            print(f"发生错误: {e}")
            time.sleep(5)

步骤 3:实现数据消费与存储 (Consumer)

from kafka import KafkaConsumer
import json
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 配置 ---
KAFKA_BROKERS = ['localhost:9092']
KAFKA_TOPIC = 'stock-realtime'
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "your_token"
INFLUXDB_ORG = "your_org"
INFLUXDB_BUCKET = "stock_market_pro"
# 初始化 InfluxDB 客户端
influx_client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
# 初始化 Kafka Consumer
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=KAFKA_BROKERS,
    auto_offset_reset='latest', # 从最新消息开始消费
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print("开始消费 Kafka 消息...")
for message in consumer:
    try:
        data = message.value
        code = data['code']
        # 构造 InfluxDB Point
        point = Point("stock_realtime")
        point.tag("symbol", code)
        point.tag("market", data['market'])
        point.field("price", float(data['price']))
        point.field("open", float(data['open']))
        point.field("high", float(data['high']))
        point.field("low", float(data['low']))
        point.field("volume", float(data['volume']))
        point.field("amount", float(data['amount']))
        # 使用 Kafka 消息中的时间戳,如果没有则用当前时间
        # Tushare 的数据有时效性,最好用其自带的时间戳
        # 这里简化处理,使用消息时间戳
        write_api.write(bucket=INFLUXDB_BUCKET, record=point)
    except Exception as e:
        print(f"处理消息时出错: {e}, 数据: {data}")
# 注意:在实际生产环境中,Consumer 应该是持续运行的,不要轻易退出
# write_api.close()
# influx_client.close()

关键挑战与解决方案

  1. 数据延迟

    • 问题:免费 API 延迟高,无法满足交易需求。
    • 方案:使用付费数据源(如 Tushare Pro, 交易所 Level-2),对于个人,Tushare Pro 是一个很好的平衡点。
  2. 高并发与高吞吐

    • 问题:直接写入数据库可能成为瓶颈,导致数据丢失。
    • 方案:引入消息队列(如 Kafka, Pulsar),作为缓冲层,吸收流量高峰,确保数据不丢失。
  3. 数据持久化与查询效率

    • 问题:关系型数据库处理海量时间序列数据效率低下。
    • 方案:使用时序数据库(如 InfluxDB, ClickHouse),它们为时间序列数据而生,提供极高的写入和查询性能。
  4. 连接稳定性与重试机制

    • 问题:网络不稳定或 API 限流可能导致程序中断。
    • 方案:在代码中加入 try-except 错误捕获,实现自动重连机制,对于 Kafka,可以配置消费者自动提交偏移量,并在重启时从上次停止的地方继续。
  5. 成本

    • 问题:付费数据源和云服务器成本不菲。
    • 方案
      • 根据业务需求选择合适的数据源层级(免费 vs. 付费)。
      • 使用云服务商的按量付费或预留实例来降低服务器成本。
      • 对数据进行分级存储,热数据放在高性能数据库,冷数据归档到对象存储(如 S3, OSS)。
特性 方案一 (免费+本地) 方案二 (专业+分布式)
数据源 新浪/腾讯等免费接口 Tushare Pro / Wind / 交易所
延迟 较高 (分钟级) 较低 (秒级或更低)
架构 简单,单进程 复杂,多组件 (Producer/Kafka/Consumer)
存储 本地文件 / 简单 DB 时序数据库 / 分布式存储
成本 高 (API费用 + 服务器费用)
适用场景 学习、个人研究、非实时监控 量化回测、交易系统、专业数据分析

对于初学者,强烈建议从方案一开始,理解数据抓取和存储的基本流程,当你对系统有更高要求时,再转向方案二,学习如何构建一个健壮、可扩展的实时数据管道。

文章版权及转载声明

作者:咔咔本文地址:https://jits.cn/content/22099.html发布于 2025-12-18
文章转载或复制请以超链接形式并注明出处杰思科技・AI 股讯

阅读
分享

发表评论

快捷回复:

评论列表 (暂无评论,1人围观)参与讨论

还没有评论,来说两句吧...