如何实时高效抓取并安全存储海量股票行情数据?
摘要:
核心概念在开始之前,我们需要理解几个核心概念:行情数据:主要包括两种类型快照数据:每 N 秒(如 5 秒、10 秒)推送一次的当前市场状态,包含股票的最新价、最高价、最低价、成交量... 核心概念
在开始之前,我们需要理解几个核心概念:
-
行情数据:主要包括两种类型
(图片来源网络,侵删)- 快照数据:每 N 秒(如 5 秒、10 秒)推送一次的当前市场状态,包含股票的最新价、最高价、最低价、成交量、涨跌幅等,这是最常见的行情数据。
- 逐笔成交数据:每一笔成交的详细信息,包括成交时间、成交价、成交量,对于高频交易和深度分析至关重要。
-
数据源:从哪里获取数据?
- 免费源:新浪财经、腾讯财经、网易财经、东方财富等,数据有延迟(15 分钟),适合学习和非实时性要求不高的场景。
- 付费/专业源:
- 交易所直连:最权威、最实时,但成本极高,门槛也高,通常是机构才能接入。
- 金融数据服务商:如 Wind(万得)、同花顺 iFinD、东方财富 Choice 等,提供高质量、低延迟的数据,但有昂贵的订阅费。
- API 接口服务商:如 Tushare、AKShare、Alpha Vantage、Polygon.io 等,它们聚合了多种数据源,提供 API 接口,成本相对较低,是个人开发者和小型团队的首选。
-
存储方案:数据存到哪里?
- 关系型数据库:如 MySQL, PostgreSQL,适合存储结构化的快照数据,可以方便地进行查询和关联,但对于高频逐笔数据,写入性能可能成为瓶颈。
- 时序数据库:如 InfluxDB, TimescaleDB (基于 PostgreSQL), Prometheus,这是存储时间序列数据的最佳选择,它针对时间戳数据进行了高度优化,写入和查询性能极高,压缩率高,非常适合存储股票行情这种天然带时间戳的数据。
- 文件存储:如 Parquet, CSV 文件,适合做数据归档、离线分析或大数据处理(如使用 Spark, Hive)。
使用免费 API + 本地存储 (入门级)
这个方案成本低,适合学习、个人研究或搭建简单的监控应用。
步骤 1:选择数据源和库
- 数据源:新浪财经,它的数据接口虽然非官方,但被广泛使用,相对稳定。
- Python 库:
requests: 用于发送 HTTP 请求获取数据。pandas: 用于处理和结构化数据。influxdb-client: 用于将数据写入 InfluxDB 时序数据库。schedule或APScheduler: 用于定时抓取数据。
步骤 2:获取实时行情数据
新浪财经提供了一个获取 A 股实时行情的接口,获取所有 A 股的行情快照:
(图片来源网络,侵删)
import requests
import pandas as pd
import time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 配置 ---
# 新浪财经行情接口
SINA_URL = "http://hq.sinajs.cn/list="
# InfluxDB 配置
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "your_token" # 替换为你的 token
INFLUXDB_ORG = "your_org" # 替换为你的组织
INFLUXDB_BUCKET = "stock_market" # 替换为你的 bucket
# 获取所有 A 股代码 (这里简化处理,实际可以从文件或数据库获取)
# 获取 'sh600000' 到 'sh604999' 和 'sz000001' 到 'sz004999'
stock_codes = [f"sh{i:06d}" for i in range(600000, 605000)] + [f"sz{i:06d}" for i in range(1, 5000)]
def fetch_sina_data(codes):
"""从新浪财经抓取指定股票的行情数据"""
all_codes_str = ",".join(codes)
response = requests.get(SINA_URL + all_codes_str)
data_str = response.text.split(";")
stock_data_list = []
for i, code in enumerate(codes):
if i < len(data_str) - 1:
# 解析返回的字符串,格式为 "var hq_str_s_sh600000="..."
content = data_str[i].split('="')[1].replace('"', '')
fields = content.split(',')
# 构造字典
stock_info = {
'code': code,
'name': fields[0],
'open': float(fields[1]),
'close': float(fields[2]),
'current_price': float(fields[3]),
'high': float(fields[4]),
'low': float(fields[5]),
'volume': float(fields[8]),
'timestamp': pd.to_datetime(fields[30] + ' ' + fields[31]) # 日期+时间
}
stock_data_list.append(stock_info)
return stock_data_list
# --- 主循环 ---
# 初始化 InfluxDB 客户端
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
try:
while True:
print(f"正在抓取数据... {time.ctime()}")
data_list = fetch_sina_data(stock_codes)
# 将数据写入 InfluxDB
for stock_data in data_list:
point = Point("stock_snapshot") # 测量名称
point.tag("stock_code", stock_data['code']) # 标签,用于快速过滤
point.tag("stock_name", stock_data['name'])
point.field("open", stock_data['open'])
point.field("close", stock_data['close'])
point.field("current_price", stock_data['current_price'])
point.field("high", stock_data['high'])
point.field("low", stock_data['low'])
point.field("volume", stock_data['volume'])
# 使用数据中的时间戳
write_api.write(bucket=INFLUXDB_BUCKET, record=point, time=stock_data['timestamp'])
print(f"成功写入 {len(data_list)} 条数据,等待 5 秒...")
time.sleep(5) # 每 5 秒抓取一次
finally:
write_api.close()
client.close()
如何运行:
- 安装依赖:
pip install requests pandas influxdb-client - 启动 InfluxDB 服务(可以使用 Docker 快速启动:
docker run -p 8086:8086 influxdb)。 - 在 InfluxDB 中创建一个 bucket 和 token。
- 修改代码中的
INFLUXDB_配置。 - 运行脚本。
使用专业 API + 高性能存储 (生产级)
这个方案更专业,延迟更低,扩展性更强,适合构建交易系统或数据分析平台。
步骤 1:选择数据源和库
- 数据源:Tushare Pro,它提供了高质量、低延迟(Level-2 行情)的金融数据,需要积分或付费。
- Python 库:
tushare: 官方 Python SDK。pandas: 数据处理。kafka: 用于构建消息队列,实现数据解耦和削峰填谷。InfluxDB/ClickHouse: 高性能存储。
架构设计
一个更健壮的系统通常采用“生产者-消费者”架构:
[数据源] -> [API 抓取程序] -> [消息队列] -> [数据存储程序] -> [数据库]
(Tushare) (Producer) (Kafka) (Consumer) (InfluxDB)
优势:
- 解耦:抓取逻辑和存储逻辑分离,互不影响。
- 可靠性:如果存储程序宕机,数据会暂存在 Kafka 中,程序恢复后可以继续消费。
- 扩展性:可以启动多个 Consumer 并行处理数据,提高写入吞吐量。
步骤 2:实现抓取与发送 (Producer)
import tushare as ts
import pandas as pd
import time
from kafka import KafkaProducer
import json
# --- 配置 ---
TUSHARE_TOKEN = "your_tushare_pro_token"
KAFKA_BROKERS = ['localhost:9092']
KAFKA_TOPIC = 'stock-realtime'
# 初始化 Tushare Pro
ts.set_token(TUSHARE_TOKEN)
pro = ts.pro_api()
# 初始化 Kafka Producer
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def fetch_and_send():
"""获取实时行情并发送到 Kafka"""
# 获取沪深A股实时行情
df = pro.realtime_quotes(market='SHSE,SZSE') # SHSE上交所, SZSE深交所
if df is not None and not df.empty:
# 将 DataFrame 转换为字典列表
records = df.to_dict(orient='records')
for record in records:
# 添加时间戳
record['fetch_time'] = pd.Timestamp.now().isoformat()
# 发送到 Kafka
producer.send(KAFKA_TOPIC, value=record)
print(f"已发送 {record['code']} 的数据到 Kafka")
if __name__ == '__main__':
while True:
try:
fetch_and_send()
print("等待 1 秒...")
time.sleep(1) # Tushare Pro 的实时行情接口建议1秒调用一次
except Exception as e:
print(f"发生错误: {e}")
time.sleep(5)
步骤 3:实现数据消费与存储 (Consumer)
from kafka import KafkaConsumer
import json
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 配置 ---
KAFKA_BROKERS = ['localhost:9092']
KAFKA_TOPIC = 'stock-realtime'
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "your_token"
INFLUXDB_ORG = "your_org"
INFLUXDB_BUCKET = "stock_market_pro"
# 初始化 InfluxDB 客户端
influx_client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
# 初始化 Kafka Consumer
consumer = KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BROKERS,
auto_offset_reset='latest', # 从最新消息开始消费
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print("开始消费 Kafka 消息...")
for message in consumer:
try:
data = message.value
code = data['code']
# 构造 InfluxDB Point
point = Point("stock_realtime")
point.tag("symbol", code)
point.tag("market", data['market'])
point.field("price", float(data['price']))
point.field("open", float(data['open']))
point.field("high", float(data['high']))
point.field("low", float(data['low']))
point.field("volume", float(data['volume']))
point.field("amount", float(data['amount']))
# 使用 Kafka 消息中的时间戳,如果没有则用当前时间
# Tushare 的数据有时效性,最好用其自带的时间戳
# 这里简化处理,使用消息时间戳
write_api.write(bucket=INFLUXDB_BUCKET, record=point)
except Exception as e:
print(f"处理消息时出错: {e}, 数据: {data}")
# 注意:在实际生产环境中,Consumer 应该是持续运行的,不要轻易退出
# write_api.close()
# influx_client.close()
关键挑战与解决方案
-
数据延迟
- 问题:免费 API 延迟高,无法满足交易需求。
- 方案:使用付费数据源(如 Tushare Pro, 交易所 Level-2),对于个人,Tushare Pro 是一个很好的平衡点。
-
高并发与高吞吐
- 问题:直接写入数据库可能成为瓶颈,导致数据丢失。
- 方案:引入消息队列(如 Kafka, Pulsar),作为缓冲层,吸收流量高峰,确保数据不丢失。
-
数据持久化与查询效率
- 问题:关系型数据库处理海量时间序列数据效率低下。
- 方案:使用时序数据库(如 InfluxDB, ClickHouse),它们为时间序列数据而生,提供极高的写入和查询性能。
-
连接稳定性与重试机制
- 问题:网络不稳定或 API 限流可能导致程序中断。
- 方案:在代码中加入
try-except错误捕获,实现自动重连机制,对于 Kafka,可以配置消费者自动提交偏移量,并在重启时从上次停止的地方继续。
-
成本
- 问题:付费数据源和云服务器成本不菲。
- 方案:
- 根据业务需求选择合适的数据源层级(免费 vs. 付费)。
- 使用云服务商的按量付费或预留实例来降低服务器成本。
- 对数据进行分级存储,热数据放在高性能数据库,冷数据归档到对象存储(如 S3, OSS)。
| 特性 | 方案一 (免费+本地) | 方案二 (专业+分布式) |
|---|---|---|
| 数据源 | 新浪/腾讯等免费接口 | Tushare Pro / Wind / 交易所 |
| 延迟 | 较高 (分钟级) | 较低 (秒级或更低) |
| 架构 | 简单,单进程 | 复杂,多组件 (Producer/Kafka/Consumer) |
| 存储 | 本地文件 / 简单 DB | 时序数据库 / 分布式存储 |
| 成本 | 低 | 高 (API费用 + 服务器费用) |
| 适用场景 | 学习、个人研究、非实时监控 | 量化回测、交易系统、专业数据分析 |
对于初学者,强烈建议从方案一开始,理解数据抓取和存储的基本流程,当你对系统有更高要求时,再转向方案二,学习如何构建一个健壮、可扩展的实时数据管道。
文章版权及转载声明
作者:咔咔本文地址:https://jits.cn/content/22099.html发布于 2025-12-18
文章转载或复制请以超链接形式并注明出处杰思科技・AI 股讯


还没有评论,来说两句吧...