使用java中Java-WebSocket做客户端
<!-- 实际使用包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!--websocket作为客户端--> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.3.5</version> </dependency>
package com.xie.websocket; import com.alibaba.fastjson.JSONArray; import lombok.extern.slf4j.Slf4j; import javax.websocket.*; import java.io.IOException; /** * @Description WebSocket Client * @Date 2022-03-31 15:35 * @Author xie */ @Slf4j @ClientEndpoint() public class WebSocketClient { // 业务service private TestService testService = (TestService) ApplicationContextHandle.getBean(TestService.class); private WebSocketStart webSocketStart = (WebSocketStart) ApplicationContextHandle.getBean(WebSocketStart.class); @OnOpen public void onOpen(Session session) { log.info("客户端建立连接......"); } @OnMessage public void onMessage(Session session, String message) { try { log.info("客户端收到消息:{}......", message); // 转化为自己bean对象 TestData data = JSONArray.parseObject(message, TestData.class); // 处理数据 processData(data); } catch (Exception e) { e.printStackTrace(); } } @OnClose public void onClose() { log.info("与服务器端断开连接......"); try { log.info("开始尝试重新连接......"); webSocketStart.start(); } catch (Exception e) { e.printStackTrace(); log.info("重新连接失败,请检查网络!"); } } private static void sendMsg(Session session, String msg) throws IOException { session.getBasicRemote().sendText(msg); } @OnError public void one rror(Session session, Throwable error){ log.error("发生错误......"); error.printStackTrace(); } /** * 处理数据 */ private void processData(TestData data) { /** * 处理数据。。。。 */ testService.batchSaveTestData(data); } }
package com.xie.websocket; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; /** * @Description: Application 上下文 **/ @Configuration public class ApplicationContextHandle implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { ApplicationContextHandle.applicationContext = applicationContext; } public static Object getBean(Class c) throws BeansException { return applicationContext.getBean(c); } // 获取当前环境 public static String getActiveProfile() { return applicationContext.getEnvironment().getActiveProfiles()[0]; } }
package com.xie.websocket; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketContainer; import java.io.IOException; import java.net.URI; /** * @Description * @Date 2022-03-31 15:43 * @Author xie */ @Slf4j @Component public class WebSocketStart { @Value("${xx.xx.websocket.uri}") private String uri; private static Session session; /** * 消息发送事件 */ private static long date; private void connect() { WebSocketContainer container = null; try { container = ContainerProvider.getWebSocketContainer(); URI r = URI.create(uri); session = container.connectToServer(WebSocketClient.class, r); } catch (Exception e) { e.printStackTrace(); } } public void start() { connect(); new Thread(new KeepAlive()).start(); try { for (int i = 0; i < 5; i++) { /** *注意:此处对session做了同步处理, * 因为下文中发送心跳包也是用的此session, * 不用synchronized做同步处理会报 * Exception in thread "Thread-5" java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method * 错误 */ synchronized (WebSocketStart.session) { WebSocketStart.session.getBasicRemote().sendText("javaclient"); } date = System.currentTimeMillis(); Thread.sleep(30000); } } catch (Exception e) { log.info("客户端出错......"); e.printStackTrace(); } } /** * 内部类,用来客户端给服务单发送心跳包维持连接 */ class KeepAlive implements Runnable { @Override public void run() { while (true) { if (System.currentTimeMillis() - date > 30000) { try { log.info("发送心跳包......"); synchronized (WebSocketStart.session) { WebSocketStart.session.getBasicRemote().sendText("keepalive"); } date = System.currentTimeMillis(); } catch (IOException e) { log.info("维持心跳包出錯......"); e.printStackTrace(); } } else { try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }