Erlo

【Spring Boot】集成Netty Socket.IO通讯框架

2019-01-10 19:01:41 发布   837 浏览  
页面报错/反馈
收藏 点赞

服务端

 1 @Configuration
 2 public class NettySocketConfig {
 3 
 4     private static final Logger logger = LoggerFactory.getLogger(NettySocketConfig.class);
 5 
 6     @Bean
 7     public SocketIOServer socketIOServer() {
 8         //创建Socket,并设置监听端口
 9         com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
10         // 设置主机名,默认是0.0.0.0
11         config.setHostname("192.168.8.107");
12         // 设置监听端口
13         config.setPort(9096);
14         // 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
15         config.setUpgradeTimeout(10000);
16         // Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
17         config.setPingInterval(60000);
18         // Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
19         config.setPingTimeout(180000);
20         // 这个版本0.9.0不能处理好namespace和query参数的问题。所以为了做认证必须使用全局默认命名空间
21         config.setAuthorizationListener(new AuthorizationListener() {
22             @Override
23             public boolean isAuthorized(HandshakeData data) {
24                 // 可以使用如下代码获取用户密码信息
25                 //String username = data.getSingleUrlParam("username");
26                 //String password = data.getSingleUrlParam("password");
27                 //logger.info("连接参数:username=" + username + ",password=" + password);
28                 //ManagerInfo managerInfo = managerInfoService.findByUsername(username);
29                 //
30                 //String salt = managerInfo.getSalt();
31                 //String encodedPassword = ShiroKit.md5(password, username + salt);
32                 //// 如果认证不通过会返回一个Socket.EVENT_CONNECT_ERROR事件
33                 //return encodedPassword.equals(managerInfo.getPassword());
34 
35                 return true;
36             }
37         });
38 
39         final SocketIOServer server = new SocketIOServer(config);
40         System.out.println("注入SocketIOServer");
41         return server;
42     }
43 
44     @Bean
45     public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
46         return new SpringAnnotationScanner(socketServer);
47     }
48 }

 

  1 @Component
  2 public class MessageEventHandler {
  3 
  4     private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);
  5 
  6     /**
  7      * 服务器socket对象
  8      */
  9     public static SocketIOServer socketIoServer;
 10 
 11     /**
 12      * 客户端集合
 13      */
 14     static ArrayList<UUID> listClient = new ArrayList<>();
 15 
 16     /**
 17      * 超时时间
 18      */
 19     static final int limitSeconds = 60;
 20 
 21     @Autowired
 22     public LoginService loginService;
 23 
 24     /**
 25      * 初始化消息事件处理器
 26      *
 27      * @param server 服务器socket对象
 28      */
 29     @Autowired
 30     public MessageEventHandler(SocketIOServer server) {
 31         logger.info("初始化SOCKET消息事件处理器");
 32         this.socketIoServer = server;
 33     }
 34 
 35     /**
 36      * 客户端发起连接时触发
 37      *
 38      * @param client 客户端Socket对象信息
 39      */
 40     @OnConnect
 41     public void onConnect(SocketIOClient client) {
 42         logger.info("客户端{}已连接", client.getSessionId());
 43         listClient.add(client.getSessionId());
 44     }
 45 
 46     /**
 47      * 客户端断开连接时触发
 48      *
 49      * @param client 客户端Socket对象信息
 50      */
 51     @OnDisconnect
 52     public void onDisconnect(SocketIOClient client) {
 53         logger.info("客户端{}断开连接", client.getSessionId());
 54         if (listClient.contains(client.getSessionId())) {
 55             listClient.remove(client.getSessionId());
 56         }
 57     }
 58 
 59 
 60     /**
 61      * 客户端发送消息时触发
 62      *
 63      * @param client  客户端Socket对象信息
 64      * @param request AckRequest 回调对象
 65      * @param data    消息信息实体
 66      */
 67     @OnEvent(value = SocketConstants.SocketEvent.MESSAGE)
 68     public void onEvent(SocketIOClient client, AckRequest request, MessageInfo data) {
 69         System.out.println("发来消息:" + data.getMsgContent());
 70         socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data");
 71     }
 72 
 73     /**
 74      * 效验连接事件并存储客户端信息
 75      *
 76      * @param client  客户端Socket对象信息
 77      * @param data    客户端数据
 78      * @param request AckRequest 回调对象
 79      */
 80     @OnEvent(value = SocketConstants.SocketEvent.HEALTH_CHECK)
 81     public void onEventByHealthCheck(SocketIOClient client, String data, AckRequest request) {
 82         //logger.info("客户端{}效验连接请求", client.getSessionId());
 83         ////解析请求数据
 84         //HealthCheckRequest healthCheckRequest = JSON.parseObject(data, HealthCheckRequest.class);
 85         //if (healthCheckRequest != null) {
 86         //    //存储客户端信息
 87         //    SocketInstance instance = SocketInstance.getSocketInstance();
 88         //    System.out.println(data);
 89         //    instance.insertSocketClient(healthCheckRequest.getEnCode(), client);
 90         //    logger.info("客户端{}效验连接响应:{}", client.getSessionId(), "OK");
 91         //    //响应客户端
 92         //    request.sendAckData("OK");
 93         //}
 94     }
 95 
 96     /**
 97      * 登录事件
 98      *
 99      * @param client  客户端Socket对象信息
100      * @param data    客户端数据
101      * @param request AckRequest 回调对象
102      */
103     @OnEvent(value = SocketConstants.SocketEvent.LOGIN)
104     public void onEventByLogin(SocketIOClient client, String data, AckRequest request) {
105         logger.info("客户端{}登录请求:{}", client.getSessionId(), data);
106         AppResponseBase appResponseBase = new AppResponseBase(0, "通讯成功");
107         //业务响应对象
108         LoginResponse loginResponse = null;
109         try {
110             //解析请求数据
111             LoginRequest loginRequest = JSON.parseObject(data, LoginRequest.class);
112             if (loginRequest == null) {
113                 throw new AppException(AppResultCode.LoginAnalysis_Fail);
114             }
115             //调用登陆接口
116             loginResponse = loginService.appLogin(loginRequest);
117             if (loginResponse == null) {
118                 throw new AppException(AppResultCode.LoginCloud_Fail);
119             }
120             if (EnumResult.Success.equals(loginResponse.getResultCode())) {
121                 //保存客户端Socket信息
122                 SocketInstance instance = SocketInstance.getSocketInstance();
123                 instance.insertSocketClient(loginRequest.getEnCode(), client);
124             }
125         } catch (AppException ex) {
126             loginResponse = new LoginResponse(ex.getAppResultCode().getCode(), ex.getAppResultCode().getMsg());
127         } catch (Exception ex) {
128             loginResponse = new LoginResponse(AppResultCode.Exceptions.getCode(), AppResultCode.Exceptions.getMsg());
129             ex.printStackTrace();
130         }
131         appResponseBase.setRespData(loginResponse);
132         String result = JSON.toJSONString(appResponseBase);
133         logger.info("客户端{}登录响应:{}", client.getSessionId(), result);
134         //响应客户端
135         request.sendAckData(result);
136     }
137 
138     /**
139      * 交易下单事件
140      * @param callPayRequest 下单请求信息实体
141      * @return
142      */
143     public static String sendByPayEvent(CallPayRequest callPayRequest) {
144         String result = "";
145         //获取客户端信息
146         SocketInstance instance = SocketInstance.getSocketInstance();
147         SocketIOClient client = instance.getClientSocket(callPayRequest.getEnCode());
148         if (client != null) {
149             //请求报文
150             String requestParam = JSON.toJSONString(callPayRequest);
151             //请求下单
152             client.sendEvent(SocketConstants.SocketEvent.PAY, new AckCallback<String>(String.class) {
153                 @Override
154                 public void onSuccess(String s) {
155                     //响应信息
156                     System.out.println("ack from client: " + client.getSessionId() + " data: " + s.toString());
157                 }
158             }, requestParam);
159 
160         } else {
161             //客户端已断开连接
162 
163         }
164         return result;
165     }
166 }

 

 1 @Component
 2 @Order(value = 1)
 3 public class MyCommandLineRunner implements CommandLineRunner {
 4 
 5     private final SocketIOServer server;
 6 
 7     @Autowired
 8     public MyCommandLineRunner(SocketIOServer server) {
 9         System.out.println("初始化MyCommandLineRunner");
10         this.server = server;
11     }
12 
13     @Override
14     public void run(String... args) {
15         try {
16             server.start();
17             System.out.println("socket.io启动成功!");
18         } catch (Exception ex) {
19             ex.printStackTrace();
20         }
21     }
22 }

 

 1 public class SocketConstants {
 2 
 3     /**
 4      * Socket事件类
 5      */
 6     public class SocketEvent {
 7 
 8         /**
 9          * 效验连接状况
10          */
11         public static final String HEALTH_CHECK = "HEALTH_CHECK";
12 
13         /**
14          * 消息接收事件名称
15          */
16         public static final String MESSAGE = "message";
17 
18         /**
19          * 登录事件名称
20          */
21         public static final String LOGIN = "LOGIN";
22 
23         /**
24          * 获取交易要素事件名称
25          */
26         public static final String QUERY_PAY_FIELDS = "QUERY_PAY_FIELDS";
27 
28         /**
29          * 创建订单事件名称
30          */
31         public static final String CREATE_ORDER = "CREATE_ORDER";
32 
33         /**
34          * 监控订单状态事件名称
35          */
36         public static final String CHECK_ORDER_STATUS = "CHECK_ORDER_STATUS";
37 
38         /**
39          * 获取订单事件名称
40          */
41         public static final String QUERY_ORDER = "QUERY_ORDER";
42 
43         /**
44          * 支付事件名称
45          */
46         public static final String PAY = "PAY";
47     }
48 }

 

 1 public class SocketInstance {
 2 
 3     /**
 4      * 客户端Socket连接对象容器
 5      */
 6     private static Map<String, SocketIOClient> socketClients = null;
 7 
 8     /**
 9      * 私有构造
10      */
11     private SocketInstance() {
12         //从缓存中获取socketClients
13         socketClients = new HashMap<>();
14     }
15 
16     /**
17      * 定义一个私有的内部类,在第一次用这个嵌套类时,会创建一个实例。而类型为SocketInstanceHolder的类,只有在SocketInstance.getSocketInstance()中调用,
18      * 由于私有的属性,他人无法使用SocketInstanceHolder,不调用SocketInstance.getSocketInstance()就不会创建实例。
19      * 优点:达到了lazy loading的效果,即按需创建实例。
20      * 无法适用于分布式集群部署
21      */
22     private static class SocketInstanceHolder {
23         /**
24          * 创建全局唯一实例
25          */
26         private final static SocketInstance instance = new SocketInstance();
27     }
28 
29     /**
30      * 获取全局唯一实例
31      *
32      * @return SocketInstance对象
33      */
34     public static SocketInstance getSocketInstance() {
35         return SocketInstanceHolder.instance;
36     }
37 
38     /**
39      * 新增客户端连接到容器
40      *
41      * @param encode         设备En号
42      * @param socketIOClient 客户端socket对象
43      */
44     public void insertSocketClient(String encode, SocketIOClient socketIOClient) {
45         SocketIOClient oldSocketIOClient = socketClients.get(encode);
46         if (oldSocketIOClient != null) {
47             try {
48                 //关闭客户端连接
49                 oldSocketIOClient.disconnect();
50             } catch (Exception ex) {
51                 ex.printStackTrace();
52             }
53         }
54         socketClients.put(encode, socketIOClient);
55     }
56 
57     /**
58      * 获取客户端Socket对象
59      *
60      * @param encode 设备encode
61      * @return 客户端Socket对象
62      */
63     public
登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认