This commit is contained in:
tangzh 2025-06-07 13:41:39 +08:00
parent 6d41bb9925
commit d02c26b830
12 changed files with 1044 additions and 407 deletions

View File

@ -17,6 +17,20 @@
</properties> </properties>
<dependencies> <dependencies>
<!-- FFmpeg 包装器 -->
<dependency>
<groupId>ws.schild</groupId>
<artifactId>jave-all-deps</artifactId>
<version>3.3.1</version>
</dependency>
<!-- 视频流处理 -->
<dependency>
<groupId>org.bytedeco</groupId>
<artifactId>javacv-platform</artifactId>
<version>1.5.6</version>
</dependency>
<!-- 代码生成模块 --> <!-- 代码生成模块 -->
<dependency> <dependency>
<groupId>me.zhengjie</groupId> <groupId>me.zhengjie</groupId>

View File

@ -3,8 +3,12 @@ package me.zhengjie.modules.security.config.enums;
public enum CapabilitieEnum { public enum CapabilitieEnum {
FILE_TRANSFER, // 支持文件传输 PAUSE, // 暂停
PRINT_CONTROL, // 支持打印控制 CONTINUE, // 继续
VIDEO_STREAM; // 支持视频流传输 EXIT, // 退出
IMAGE, // 图片
OPEN_VIDEO, // 打开-视频
CLOSE_VIDEO, // 关闭-视频
;
} }

View File

@ -8,6 +8,9 @@ public enum MethodEnum {
status, status,
attributes, attributes,
error, error,
notice; notice,
close,
disconnect,
;
} }

View File

@ -25,5 +25,6 @@ public enum MsgEnum {
/** 信息 */ /** 信息 */
INFO, INFO,
/** 错误 */ /** 错误 */
ERROR ERROR,
;
} }

View File

@ -0,0 +1,59 @@
package me.zhengjie.modules.system.service;
import me.zhengjie.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketService {
public static BusDeviceService myDeviceService;
@Autowired private BusDeviceService deviceService;
// 初始化
@PostConstruct
public void init() {
if (null == myDeviceService) {
myDeviceService = deviceService;
}
}
// 存储设备号
public static List<String> deviceList = new ArrayList<>();
// 存储连接的客户端
public String channel;
public Session session;
public static final ConcurrentHashMap<String, WebSocketService> clients = new ConcurrentHashMap<>();
public WebSocketService getSocket(String key) {
if (!StringUtils.isEmpty(key)) {
return clients.get(key);
}
for (WebSocketService socketServer : clients.values()) {
if (socketServer.equals(this)) {
return socketServer;
}
}
return null;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WebSocketService that = (WebSocketService) o;
return Objects.equals(session, that.session) && Objects.equals(channel, that.channel);
}
}

View File

@ -0,0 +1,256 @@
package me.zhengjie.modules.system.service.impl;//package me.zhengjie.modules.system.service.webstocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.security.config.WebSocketConfig;
import me.zhengjie.modules.security.config.enums.*;
import me.zhengjie.modules.system.domain.BusDevice;
import me.zhengjie.modules.system.service.WebSocketService;
import me.zhengjie.modules.system.service.webstocket.SocketMsg;
import me.zhengjie.modules.system.service.webstocket.req.WebSocketReqDTO;
import me.zhengjie.modules.system.service.webstocket.req.WebSocketReqData;
import me.zhengjie.modules.system.service.webstocket.res.*;
import me.zhengjie.utils.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ServerEndpoint("/webSocket/sdcp")
@Slf4j
@Component
public class WebSocketSdcpServiceImpl extends WebSocketService {
/**
* 连接调用方法
* @param session
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.channel = WebSocketConfig.TOPIC_PREFIX;
log.info("【SDCP服务端】连接成功channel={}", channel);
clients.put(channel, this);
openDeviceStatus();
}
/**
* 连接关闭调用的方法
* @param session
*/
@OnClose
public void onClose(Session session) {
WebSocketService collect = getSocket(WebSocketConfig.TOPIC_PREFIX);
if (null == collect) { return; }
log.info("【SDCP服务端】连接断开channel={}", collect.channel);
closeDeviceStatus();
}
/**
* 连接异常时
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
WebSocketService collect = getSocket(WebSocketConfig.TOPIC_PREFIX);
if (null == collect) { return; }
log.info("【SDCP服务端】发现异常channel={}", collect.channel);
error.printStackTrace();
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(Session session, String message) {
WebSocketService collect = getSocket(WebSocketConfig.TOPIC_PREFIX);
if (null == collect) { return; }
log.info("【SDCP服务端】收到消息channel={}message={}", collect.channel, message);
serviceMessageHandle(session, message);
}
// 连接关闭状态刷新
private void closeDeviceStatus() {
for (String deviceSn : deviceList) {
WebSocketReqDTO reqDTO = new WebSocketReqDTO();
reqDTO.setTopic(WebSocketConfig.TOPIC_PREFIX + "/" + MethodEnum.close.name() + "/" + deviceSn);
serviceMessageHandle(null, JSON.toJSONString(reqDTO));
}
}
// 初始状态刷新消息(Cmd:0) 指令
private void openDeviceStatus() {
for (String deviceSn : deviceList) {
long currentTimeLong = System.currentTimeMillis();
WebSocketReqDTO reqDTO = new WebSocketReqDTO();
reqDTO.setTopic(WebSocketConfig.TOPIC_PREFIX + "/" + MethodEnum.request.name() + "/" + deviceSn);
WebSocketReqData data = new WebSocketReqData();
data.setCmd(0);
data.setData(new Object());
data.setRequestID(currentTimeLong + "");
data.setMainboardID(deviceSn);
data.setTimeStamp(currentTimeLong);
data.setFrom(0);
reqDTO.setData(data);
serviceMessageHandle(null, JSON.toJSONString(reqDTO));
}
}
// 处理服务端接收到的消息
private void serviceMessageHandle(Session session, String message) {
if (StringUtils.isEmpty(message)) { return; }
// 如果是第三方发来的[心跳]检测返回 pong
if (WebSocketConfig.PING.equals(message)) {
log.info("<<< 心跳反馈");
try { session.getBasicRemote().sendText(WebSocketConfig.PONG);
} catch (IOException e) { throw new RuntimeException(e); }
return;
}
message = message.replace("\\", "");
WebSocketResDTO param = null;
MethodEnum method = null;
try {
param = JSON.parseObject(message, WebSocketResDTO.class);
method = param.getMethod();
} catch (Exception e) { e.printStackTrace(); }
if (null == method) {
log.info("<<< 请求类型有误,忽略");
return;
}
log.info(">>> 方式 {}", method);
String deviceSn = param.getMainboardID();
// 只关心已有的设备
if (!deviceList.contains(deviceSn)) {
log.info("<<< 设备【{}】在系统未找到,忽略", deviceSn);
return;
}
if (method == MethodEnum.request) {
try {
WebSocketService socket = getSocket(WebSocketConfig.TOPIC_PREFIX);
socket.session.getBasicRemote().sendText(message);
} catch (IOException e) { throw new RuntimeException(e); }
return;
}
WebSocketResDTO resDTO = JSON.parseObject(message, WebSocketResDTO.class);
if (method == MethodEnum.response) {
// 指令响应消息
WebSocketResData data = param.getData();
Integer cmd = data.getCmd();
// 指令-获取图片
if (cmd.equals(385)) {
JSONObject cData = JSON.parseObject(data.getData().toString());
Map<String, Object> commandResult = new HashMap<>();
int Ack = Integer.parseInt(cData.get("Ack").toString());
if (Ack == 0) {
commandResult.put("deviceSn", deviceSn);
commandResult.put("imageType", cData.get("ImageType"));
commandResult.put("imageData", cData.get("ImageData"));
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "指令响应成功", MsgEnum.INFO)));
} else if (Ack == 1) {
commandResult.put("deviceSn", deviceSn);
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "图片获取失败", MsgEnum.ERROR)));
} else if (Ack == 2) {
commandResult.put("deviceSn", deviceSn);
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "不支持该类型图片", MsgEnum.ERROR)));
} else if (Ack == 3) {
commandResult.put("deviceSn", deviceSn);
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "未知错误", MsgEnum.ERROR)));
}
}
// 指令-打开/关闭视频
else if (cmd.equals(386)) {
JSONObject cData = JSON.parseObject(data.getData().toString());
int Ack = Integer.parseInt(cData.get("Ack").toString());
Map<String, Object> commandResult = new HashMap<>();
if (Ack == 0) {
commandResult.put("deviceSn", deviceSn);
commandResult.put("videoUrl", cData.get("VideoUrl"));
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "指令响应成功", MsgEnum.INFO)));
} else if (Ack == 1) {
commandResult.put("deviceSn", deviceSn);
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "超过最大同时拉流限制", MsgEnum.ERROR)));
} else if (Ack == 2) {
commandResult.put("deviceSn", deviceSn);
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "摄像头不存在", MsgEnum.ERROR)));
} else if (Ack == 3) {
commandResult.put("deviceSn", deviceSn);
sendVueMessage(JSON.toJSONString(new SocketMsg(commandResult, "未知错误", MsgEnum.ERROR)));
}
}
} else if (method == MethodEnum.status) {
WebSocketResStatus status = resDTO.getStatus();
WebSocketResPrintInfo printInfo = status.getPrintInfo();
myDeviceService.updateDeviceStatus(
deviceSn,
CurrentStatusEnum.getEnumByNum(status.getCurrentStatus()),
PrintInfoStatusEnum.getEnumByNum(printInfo.getStatus()),
ErrorStatusEnum.SDCP_PRINT_ERROR_NONE,
null
);
sendVueMessage(JSON.toJSONString(new SocketMsg(deviceSn, MsgEnum.INFO)));
} else if (method == MethodEnum.attributes) {
WebSocketResAttributes attributes = resDTO.getAttributes();
// String[] capabilities = attributes.getCapabilities();
BusDevice entity = new BusDevice();
entity.setDeviceSn(deviceSn);
entity.setName(attributes.getName());
entity.setModel(attributes.getMachineName());
entity.setFirmwareVersion(attributes.getFirmwareVersion());
entity.setProtocolVersion(attributes.getProtocolVersion());
// List<BusDeviceCommand> commandList = new ArrayList<>();
// for (String capability : capabilities) {
// BusDeviceCommand command = new BusDeviceCommand();
// command.setCommandType(capability);
// commandList.add(command);
// }
// entity.setCommandList(commandList);
myDeviceService.updateOrAdd(entity);
} else if (method == MethodEnum.error) {
WebSocketResStatus status = resDTO.getStatus();
WebSocketResPrintInfo printInfo = status.getPrintInfo();
myDeviceService.updateDeviceStatus(
deviceSn,
CurrentStatusEnum.getEnumByNum(status.getCurrentStatus()),
PrintInfoStatusEnum.getEnumByNum(printInfo.getStatus()),
ErrorStatusEnum.SDCP_PRINT_ERROR_NONE,
null
);
sendVueMessage(JSON.toJSONString(new SocketMsg(deviceSn, MsgEnum.ERROR)));
} else if (method == MethodEnum.notice) {
} else if (method == MethodEnum.close || method == MethodEnum.disconnect) {
String msg = (method == MethodEnum.close ? "已关闭连接" : "已断开连接");
MsgEnum en = (method == MethodEnum.close ? MsgEnum.CLOSE : MsgEnum.DISCONNECT);
myDeviceService.updateDeviceStatus(
null,
CurrentStatusEnum.SDCP_MACHINE_STATUS_DEFAULT,
PrintInfoStatusEnum.SDCP_PRINT_STATUS_DEFAULT,
ErrorStatusEnum.SDCP_PRINT_ERROR_DEFAULT,
msg
);
sendVueMessage(JSON.toJSONString(new SocketMsg(msg, en)));
}
}
// 发送消息给第三方
private void sendVueMessage(String message) {
for (Map.Entry<String, WebSocketService> entry : clients.entrySet()) {
try {
WebSocketService server = entry.getValue();
String channel = server.channel;
Session session = server.session;
if (channel.equals(WebSocketConfig.TOPIC_PREFIX) || null == session || !session.isOpen()) { continue; }
log.info(">>> 推送消息给VUEchannel={}message={}", channel, message);
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
log.info("<<< 推送消息给VUE-异常 原因:", e);
}
}
}
}

View File

@ -0,0 +1,186 @@
package me.zhengjie.modules.system.service.impl;//package me.zhengjie.modules.system.service.webstocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.security.config.WebSocketConfig;
import me.zhengjie.modules.security.config.enums.CapabilitieEnum;
import me.zhengjie.modules.security.config.enums.MethodEnum;
import me.zhengjie.modules.system.domain.BusDevice;
import me.zhengjie.modules.system.domain.dto.BusDeviceQueryCriteria;
import me.zhengjie.modules.system.service.WebSocketService;
import me.zhengjie.modules.system.service.webstocket.req.WebSocketReqDTO;
import me.zhengjie.modules.system.service.webstocket.req.WebSocketReqData;
import me.zhengjie.utils.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
@ServerEndpoint("/webSocket/vue")
@Slf4j
@Component
public class WebSocketVueServiceImpl extends WebSocketService {
// 初始化
@PostConstruct
public void init() {
log.info(">>> 设备初始化");
refreshDeviceList();
log.info("<<< 设备初始化");
}
@Scheduled(fixedRate = 60000)
public void refreshDeviceList() {
deviceList = new ArrayList<>();
List<BusDevice> devices = myDeviceService.queryAll(new BusDeviceQueryCriteria());
for (BusDevice device : devices) {
deviceList.add(device.getDeviceSn());
}
}
/**
* 连接调用方法
* @param session
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.channel = session.getId();
log.info("【VUE服务端】连接成功channel={}", channel);
clients.put(channel, this);
}
/**
* 连接关闭调用的方法
* @param session
*/
@OnClose
public void onClose(Session session) {
WebSocketService collect = getSocket(session.getId());
if (null == collect) { return; }
log.info("【VUE服务端】连接断开channel={}", collect.channel);
}
/**
* 连接异常时
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
WebSocketService collect = getSocket(session.getId());
if (null == collect) { return; }
log.info("【VUE服务端】发现异常channel={}", collect.channel);
error.printStackTrace();
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(Session session, String message) {
WebSocketService collect = getSocket(session.getId());
if (null == collect) { return; }
log.info("【VUE服务端】收到消息channel={}message={}", collect.channel, message);
serviceMessageHandle(message);
}
private void serviceMessageHandle(String message) {
if (StringUtils.isEmpty(message)) { return; }
// 发送的指令
message = message.replace("\\", "");
WebSocketReqDTO param = null;
MethodEnum method = null;
try {
param = JSON.parseObject(message, WebSocketReqDTO.class);
method = param.getMethod();
} catch (Exception e) { e.printStackTrace(); }
if (null == method || method != MethodEnum.request) {
log.info("<<< 请求类型有误,忽略");
return;
}
log.info(">>> 方式 {}", method);
String deviceSn = param.getMainboardID();
// 只关心已有的设备
if (!deviceList.contains(deviceSn)) {
log.info("<<< 设备【{}】在系统未找到,忽略", deviceSn);
return;
}
String command = param.getCommand();
if (null == command) {
log.info("<<< 命令类型未找到,忽略");
return;
}
String commandStr = null;
long sysTimeLong = System.currentTimeMillis();
// 暂停
if (command.equals(CapabilitieEnum.PAUSE.name())) {
}
// 继续
else if (command.equals(CapabilitieEnum.CONTINUE.name())) {
}
// 退出
else if (command.equals(CapabilitieEnum.EXIT.name())) {
}
// 图片
else if (command.equals(CapabilitieEnum.IMAGE.name())) {
Map<String, Object> dataParam = new HashMap<>();
dataParam.put("Type", 0);
WebSocketReqData data = new WebSocketReqData();
data.setCmd(385);
data.setData(dataParam);
data.setRequestID(sysTimeLong + "");
data.setMainboardID(deviceSn);
data.setTimeStamp(sysTimeLong);
WebSocketReqDTO req = new WebSocketReqDTO();
req.setTopic(WebSocketConfig.TOPIC_PREFIX + "/" + MethodEnum.request + "/" + deviceSn);
req.setData(data);
commandStr = JSON.toJSONString(req);
}
// 打开视频
else if (command.equals(CapabilitieEnum.OPEN_VIDEO.name()) || command.equals(CapabilitieEnum.CLOSE_VIDEO.name())) {
Map<String, Object> dataParam = new HashMap<>();
dataParam.put("Enable", command.equals(CapabilitieEnum.OPEN_VIDEO.name()) ? 1 : 0);
WebSocketReqData data = new WebSocketReqData();
data.setCmd(386);
data.setData(dataParam);
data.setRequestID(sysTimeLong + "");
data.setMainboardID(deviceSn);
data.setTimeStamp(sysTimeLong);
WebSocketReqDTO req = new WebSocketReqDTO();
req.setTopic(WebSocketConfig.TOPIC_PREFIX + "/" + MethodEnum.request + "/" + deviceSn);
req.setData(data);
commandStr = JSON.toJSONString(req);
}
else {
log.info("<<< 命令类型未找到,忽略");
return;
}
sendThirdPartyMessage(commandStr);
}
// 发送消息给第三方
private void sendThirdPartyMessage(String message) {
for (Map.Entry<String, WebSocketService> entry : clients.entrySet()) {
try {
WebSocketService server = entry.getValue();
String channel = server.channel;
Session session = server.session;
if (!channel.equals(WebSocketConfig.TOPIC_PREFIX) || null == session || !session.isOpen()) { continue; }
log.info(">>> 发送指令消息给第三方channel={}message={}", channel, message);
session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
log.info("<<< 发送指令消息给第三方-异常 原因:", e);
}
}
}
}

View File

@ -20,6 +20,8 @@ import me.zhengjie.modules.security.config.enums.MsgEnum;
@Data @Data
public class SocketMsg { public class SocketMsg {
private Object data;
private String msg; private String msg;
private MsgEnum msgType; private MsgEnum msgType;
@ -27,4 +29,10 @@ public class SocketMsg {
this.msg = msg; this.msg = msg;
this.msgType = msgType; this.msgType = msgType;
} }
public SocketMsg(Object data, String msg, MsgEnum msgType) {
this.data = data;
this.msg = msg;
this.msgType = msgType;
}
} }

View File

@ -32,6 +32,7 @@ public class WebSocketParam {
} }
public MethodEnum getMethod() { public MethodEnum getMethod() {
if (StringUtil.isEmpty(getTopic())) { return null; }
for (MethodEnum os : MethodEnum.values()) { for (MethodEnum os : MethodEnum.values()) {
if (getTopic().contains(os.name())) { if (getTopic().contains(os.name())) {
return os; return os;

View File

@ -1,357 +0,0 @@
package me.zhengjie.modules.system.service.webstocket;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import me.zhengjie.modules.security.config.WebSocketConfig;
import me.zhengjie.modules.security.config.enums.*;
import me.zhengjie.modules.system.domain.BusDevice;
import me.zhengjie.modules.system.domain.BusDeviceCommand;
import me.zhengjie.modules.system.domain.dto.BusDeviceQueryCriteria;
import me.zhengjie.modules.system.service.BusDeviceService;
import me.zhengjie.modules.system.service.webstocket.req.WebSocketReqDTO;
import me.zhengjie.modules.system.service.webstocket.req.WebSocketReqData;
import me.zhengjie.modules.system.service.webstocket.res.WebSocketResAttributes;
import me.zhengjie.modules.system.service.webstocket.res.WebSocketResDTO;
import me.zhengjie.modules.system.service.webstocket.res.WebSocketResPrintInfo;
import me.zhengjie.modules.system.service.webstocket.res.WebSocketResStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
@ServerEndpoint("/webSocket")
@Slf4j
@Component
public class WebSocketServer {
@Autowired private BusDeviceService deviceService;
private static BusDeviceService myDeviceService;
// 存储设备
private static final CopyOnWriteArraySet<String> deviceList = new CopyOnWriteArraySet<>();
// 存储连接的前端客户端
private static final ConcurrentHashMap<String, Session> vueClients = new ConcurrentHashMap<>();
// 第三方WebSocket连接
@Value("${thirdParty.socket.init}")
private Boolean thirdPartyInit;
@Value("${thirdParty.socket.url}")
private String thirdPartyWsUrl;
private static Session thirdPartySession;
private static WebSocketContainer thirdPartyContainer;
// 重连机制
private final ScheduledExecutorService reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
// 初始化连接第三方WebSocket
@PostConstruct
public void initThirdPartyWebSocket() {
if (!thirdPartyInit) { return; }
if (null != thirdPartyContainer && null != thirdPartySession) { return; }
try {
thirdPartyContainer = ContainerProvider.getWebSocketContainer();
thirdPartySession = thirdPartyContainer.connectToServer(ThirdPartyClient.class, URI.create(thirdPartyWsUrl));
log.info("【客户端->第三方】连接成功");
} catch (Exception e) {
e.printStackTrace();
log.info("【客户端->第三方】连接失败等待5秒重新连接");
scheduleReconnect();
}
if (null == myDeviceService) {
myDeviceService = deviceService;
}
}
// 安排重连任务
private void scheduleReconnect() {
reconnectExecutor.schedule(this::initThirdPartyWebSocket, 5, TimeUnit.SECONDS);
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
log.info("【VUE->服务端】连接成功sessionId={}", session.getId());
vueClients.put(session.getId(), session);
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
log.info("【VUE->服务端】连接断开sessionId={}", session.getId());
vueClients.remove(session.getId());
}
/**
* 连接异常时
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.info("【VUE->服务端】发现异常sessionId={}", session.getId());
error.printStackTrace();
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message) {
log.info("WebSocketServer onMessage 收到消息={}", message);
sendMessage(message);
}
/**
* 主推消息给客户端
*/
private void sendMessage(String message) {
// 根据类型操作不同的消息
// todo 如果是指令则转发给第三方WebSocket
if (!(thirdPartySession != null && thirdPartySession.isOpen())) {
log.info("指令发送失败,原因: websocket未连接");
return;
}
try {
thirdPartySession.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
log.info("指令发送异常,原因:", e);
}
}
// 处理来自第三方WebSocket的消息并转发给Vue前端
@ClientEndpoint
public static class ThirdPartyClient {
private Timer timer;
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳间隔
@Autowired @Lazy
private WebSocketServer parentService; // 注入父类Bean
public void callParentInitThirdPartyWebSocket() {
parentService.initThirdPartyWebSocket();
}
@OnOpen
public void onOpen(Session session) {
log.info("【第三方客户端】连接成功");
initDevices();
startHeartbeat();
socketMsg(new SocketMsg("已连接", MsgEnum.CONNECT));
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("【第三方客户端】收到消息:{}", message);
messageReceiveHandle(message);
}
@OnClose
public void onClose(Session session) {
log.info("【第三方客户端】连接断开");
socketMsg(new SocketMsg("连接断开", MsgEnum.DISCONNECT));
try {
thirdPartySession.close();
thirdPartySession = null;
thirdPartyContainer = null;
callParentInitThirdPartyWebSocket();
startHeartbeat();
} catch (Exception e) {}
}
@OnError
public void onError(Session session, Throwable error) {
log.info("【第三方客户端】发现异常,原因:", error);
socketMsg(new SocketMsg("连接异常", MsgEnum.ERROR));
}
private void startHeartbeat() {
timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
messageSendHandle(WebSocketConfig.PING);
}
}, 0, HEARTBEAT_INTERVAL);
}
private void stopHeartbeat() {
if (timer != null) {
timer.cancel();
timer = null;
}
}
// 收到消息解析做响应的处理
private void messageReceiveHandle(String message) {
// 如果是 [心跳] 检测忽略
if (WebSocketConfig.PONG.equals(message)) {
log.info("<<< 心跳反馈,忽略");
return;
}
// 其余信息是第三方推送过来的消息 / 发送的指令
message = message.replace("\\", "");
WebSocketParam param = JSON.parseObject(message, WebSocketParam.class);
MethodEnum method = param.getMethod();
String deviceSn = param.getMainboardID();
log.info("<<< 方式 {}", method.name());
// 只关心我已有的设备
if (!deviceList.contains(deviceSn)) {
log.info("<<< 设备【{}】在系统未找到,忽略", deviceSn);
return;
}
BusDevice device = myDeviceService.getByDeviceSn(deviceSn);
// request 说明是vue要发送指令给第三方
if (method == MethodEnum.request) {
WebSocketReqDTO requestReq = JSON.parseObject(message, WebSocketReqDTO.class);
if (null == requestReq.getCommand()) {
log.info("<<< 命令类型未找到");
return;
}
// todo 发送指令给第三方
// CapabilitieEnum command = requestReq.getCommand();
// messageSendHandle(null);
}
// 其他说明是第三方上推给我们客服端
else {
WebSocketResDTO resDTO = JSON.parseObject(message, WebSocketResDTO.class);
if (method == MethodEnum.response) {
// List<BusDevice> devices = new ArrayList<>();
// BusDevice device = new BusDevice();
// device.setId();
// devices.add(device);
// pushVueClients();
} else if (method == MethodEnum.status) {
WebSocketResStatus status = resDTO.getStatus();
WebSocketResPrintInfo printInfo = status.getPrintInfo();
myDeviceService.updateDeviceStatus(
deviceSn,
CurrentStatusEnum.getEnumByNum(status.getCurrentStatus()),
PrintInfoStatusEnum.getEnumByNum(printInfo.getStatus()),
ErrorStatusEnum.SDCP_PRINT_ERROR_NONE,
null
);
BusDevice info = new BusDevice();
info.setDeviceCode(device.getDeviceCode());
info.setStatus(CurrentStatusEnum.getEnumByNum(status.getCurrentStatus()).getCode());
info.setPrintStatus(PrintInfoStatusEnum.getEnumByNum(printInfo.getStatus()).getCode());
info.setErrorStatus(ErrorStatusEnum.SDCP_PRINT_ERROR_NONE.getCode());
socketMsg(new SocketMsg(JSON.toJSONString(info), MsgEnum.INFO));
} else if (method == MethodEnum.attributes) {
WebSocketResAttributes attributes = resDTO.getAttributes();
String[] capabilities = attributes.getCapabilities();
BusDevice entity = new BusDevice();
entity.setDeviceSn(deviceSn);
entity.setName(attributes.getName());
entity.setModel(attributes.getMachineName());
entity.setFirmwareVersion(attributes.getFirmwareVersion());
entity.setProtocolVersion(attributes.getProtocolVersion());
List<BusDeviceCommand> commandList = new ArrayList<>();
for (String capability : capabilities) {
BusDeviceCommand command = new BusDeviceCommand();
command.setCommandType(capability);
commandList.add(command);
}
entity.setCommandList(commandList);
myDeviceService.updateOrAdd(entity);
} else if (method == MethodEnum.error) {
WebSocketResStatus status = resDTO.getStatus();
WebSocketResPrintInfo printInfo = status.getPrintInfo();
myDeviceService.updateDeviceStatus(
deviceSn,
CurrentStatusEnum.getEnumByNum(status.getCurrentStatus()),
PrintInfoStatusEnum.getEnumByNum(printInfo.getStatus()),
ErrorStatusEnum.SDCP_PRINT_ERROR_NONE,
null
);
BusDevice info = new BusDevice();
info.setDeviceCode(device.getDeviceCode());
info.setStatus(CurrentStatusEnum.getEnumByNum(status.getCurrentStatus()).getCode());
info.setPrintStatus(PrintInfoStatusEnum.getEnumByNum(printInfo.getStatus()).getCode());
info.setErrorStatus(ErrorStatusEnum.SDCP_PRINT_ERROR_NONE.getCode());
socketMsg(new SocketMsg(JSON.toJSONString(info), MsgEnum.ERROR));
} else if (method == MethodEnum.notice) {
}
}
}
private void socketMsg(SocketMsg socketMsg) {
String message = JSON.toJSONString(socketMsg);
pushVueClients(message);
}
// 将消息广播给所有前端客户端
private void pushVueClients(String message) {
vueClients.forEach((id, clientSession) -> {
try {
clientSession.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
});
}
// 发送消息给第三方
private void messageSendHandle(String message) {
if (!(thirdPartySession != null && thirdPartySession.isOpen())) {
log.info("<<< 发送消息给第三方-失败 原因: websocket 已关闭");
return;
}
try {
log.info(">>> 发送消息给第三方 {}", message);
thirdPartySession.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
log.info("<<< 发送消息给第三方-异常 原因:", e);
}
}
/**
* 初始化设备并向第三方发指令为了拿到设备的状态
*/
private void initDevices() {
log.info(">>> 设备初始化");
List<BusDevice> devices = myDeviceService.queryAll(new BusDeviceQueryCriteria());
for (BusDevice device : devices) {
String deviceSn = device.getDeviceSn();
long currentTimeLong = System.currentTimeMillis();
WebSocketReqDTO reqDTO = new WebSocketReqDTO();
reqDTO.setTopic(WebSocketConfig.TOPIC_PREFIX + "/" + MethodEnum.request.name() + "/" + deviceSn);
WebSocketReqData data = new WebSocketReqData();
data.setCmd(0);
data.setData(new Object());
data.setRequestID(currentTimeLong + "");
data.setMainboardID(device.getDeviceSn());
data.setTimeStamp(currentTimeLong);
data.setFrom(0);
reqDTO.setData(data);
// 发送 状态刷新消息(Cmd:0) 指令
messageSendHandle(JSON.toJSONString(reqDTO));
// 订阅设备
deviceList.add(deviceSn);
}
log.info("<<< 设备初始化");
}
}
}

View File

@ -1,7 +1,6 @@
package me.zhengjie.modules.system.service.webstocket.req; package me.zhengjie.modules.system.service.webstocket.req;
import lombok.Data; import lombok.Data;
import me.zhengjie.modules.security.config.enums.CapabilitieEnum;
import me.zhengjie.modules.system.service.webstocket.WebSocketParam; import me.zhengjie.modules.system.service.webstocket.WebSocketParam;
import java.io.Serializable; import java.io.Serializable;
@ -15,16 +14,7 @@ public class WebSocketReqDTO extends WebSocketParam implements Serializable {
// 机器品牌标识32位UUID // 机器品牌标识32位UUID
private String Id; private String Id;
// 指令类型 // 指令类型 CapabilitieEnum
private CapabilitieEnum command; private String command;
public CapabilitieEnum getCommand() {
for (CapabilitieEnum os : CapabilitieEnum.values()) {
if (getTopic().contains(os.name())) {
return os;
}
}
return null;
}
} }

File diff suppressed because one or more lines are too long