在量化交易系统或行情订阅程序中,WebSocket 是实现实时行情获取的关键通道。但在实际部署中,我们经常会遇到一个头痛的问题:WebSocket连接在运行一段时间后断开了,而我们的策略还以为数据一直在更新,直到发生实际交易损失。
本文将以外汇行情 WebSocket 接口为例,详细还原一次断连排查过程,并提供一个健壮的重连机制实现方案,帮助你构建高可用的实时行情接入组件。
初始接入代码示例(外汇行情)
根据infoway API的官方示例中,我们通过如下代码订阅了 BTCUSDT 的实时行情(注意:虽然标的是 crypto,Infoway 支持的外汇行情与加密币行情结构一致,实际只需修改订阅品种即可):
private static final String WS_URL =
"wss://data.infoway.io/ws?business=forex&apikey=yourApikey";
// 申请免费token: https://infoway.io// 对接文档:docs.infoway.ioJSONObject jsonObject =
new JSONObject();jsonObject.put(
"code",
10000);jsonObject.put(
"trace", UUID.randomUUID().toString());JSONObject data =
new JSONObject();data.put(
"codes",
"EURUSD");
// 示例:订阅 EUR/USD 外汇对jsonObject.put(
"data", data);session.getBasicRemote().sendText(jsonObject.toJSONString());
这是典型的基于 @ClientEndpoint 的 WebSocket 接入方式。但在运行过程中,我们观察到 30 分钟左右后,控制台停止打印行情,连接虽然没有报错,但实际上已经断开了。
问题表现
没有报错或异常(@OnError未触发);@OnClose 也未打印;行情打印停止,ping 发送不报错但无回应;监控发现 WebSocket 实际已经被远端关闭(心跳丢失)。
原因分析
此类问题在接入 WebSocket 时较为常见,可能由以下原因之一引起:
客户端网络瞬断;长时间未发送心跳或心跳机制失效;远端服务端定期断开空闲连接;Session 对象未能正确检测“半连接”状态(即 TCP 已断但对象仍可调用);Java WebSocket API 没有内建自动重连机制。
重连机制设计思路
为了解决这个问题,我们需要构建一个健壮的“自动重连机制”,核心包括:
监听连接状态:@OnClose 和 @OnError 都要触发重连;每次连接失败要有退避策略(避免死循环尝试);在主线程或守护线程中保持连接活跃监测;支持手动关闭与恢复机制,防止重复连接。
改造后的关键代码(支持自动重连)
package org.example.ws;
import com.alibaba.fastjson2.JSONObject;
import jakarta.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.*;
// 申请免费token: https://infoway.io// 对接文档:docs.infoway.iopublic class ReconnectableWSClient {
private static Session session;
private static final String WS_URL =
"wss://data.infoway.io/ws?business=forex&apikey=yourApikey";
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(
2);
private static final int RECONNECT_DELAY =
5;
// secondsprivate static volatile boolean userClose =
false;
public static void main(String[] args) {connectWithRetry();scheduler.scheduleAtFixedRate(ReconnectableWSClient::ping,
30,
30, TimeUnit.SECONDS);}
private static void connectWithRetry() {scheduler.execute(() -> {
while (
true) {
try {WebSocketContainer container = ContainerProvider.getWebSocketContainer();session = container.connectToServer(MyClientEndpoint
.class, URI.create(WS_URL));subscribeForex(
"EURUSD");
break;}
catch (Exception e) {System.err.println(
"连接失败," + RECONNECT_DELAY +
"秒后重试...");
try {TimeUnit.SECONDS.sleep(RECONNECT_DELAY);}
catch (InterruptedException ignored) {}}}});}
private static void subscribeForex(String symbol) throws IOException {JSONObject jsonObject =
new JSONObject();jsonObject.put(
"code",
10000);jsonObject.put(
"trace", UUID.randomUUID().toString());JSONObject data =
new JSONObject();data.put(
"codes", symbol);jsonObject.put(
"data", data);session.getBasicRemote().sendText(jsonObject.toJSONString());}
private static void ping() {
if (session !=
null && session.isOpen()) {
try {JSONObject jsonObject =
new JSONObject();jsonObject.put(
"code",
10010);jsonObject.put(
"trace", UUID.randomUUID().toString());session.getBasicRemote().sendText(jsonObject.toJSONString());}
catch (IOException e) {System.err.println(
"Ping发送失败,尝试重连...");reconnect();}}}
private static void reconnect() {
if (userClose)
return;
try {
if (session !=
null && session.isOpen()) {session.close();}}
catch (IOException ignored) {}connectWithRetry();}
@ClientEndpointpublic static class MyClientEndpoint {
@OnOpenpublic void onOpen(Session session) {System.out.println(
"连接已建立:" + session.getId());}
@OnMessagepublic void onMessage(String message) {System.out.println(
"收到数据:" + message);}
@OnClosepublic void onClose(Session session, CloseReason reason) {System.err.println(
"连接关闭,原因:" + reason);reconnect();}
@OnErrorpublic void onError(Session session, Throwable throwable) {System.err.println(
"发生错误:" + throwable.getMessage());reconnect();}}}
经验总结
WebSocket 是实时行情系统的核心通道,必须做到可用性高、断线可恢复;Java 的 WebSocket API 原生不支持重连,需要手动封装;合理使用线程池和状态控制,可以避免连接雪崩或资源泄露;实际部署中建议使用独立监控线程检测长时间无数据/心跳响应,进一步增强稳定性。