Commit e86f1cc0 by 杜祥龙

fix 接受消息

Signed-off-by: sdvdxl杜龙少 <sdvdxl@163.com>
parent c7de935e
...@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
...@@ -12,7 +13,7 @@ import java.util.concurrent.TimeUnit; ...@@ -12,7 +13,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
public class GatewayDevice { public class GatewayDevice {
private static final Executor EXECUTOR = Executors.newSingleThreadExecutor(); private static final Executor EXECUTOR = Executors.newCachedThreadPool();
// 设备id // 设备id
private static final String DEV_TID = ""; private static final String DEV_TID = "";
// 产品密钥 // 产品密钥
...@@ -26,6 +27,7 @@ public class GatewayDevice { ...@@ -26,6 +27,7 @@ public class GatewayDevice {
public static void main(String[] args) public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException { throws IOException, InterruptedException, ExecutionException, TimeoutException {
CountDownLatch countDownLatch = new CountDownLatch(1);
TcpClient tcpClient = new TcpClient("hub.hekr.me", 83).connect(); TcpClient tcpClient = new TcpClient("hub.hekr.me", 83).connect();
String randomKey = null; String randomKey = null;
...@@ -39,9 +41,11 @@ public class GatewayDevice { ...@@ -39,9 +41,11 @@ public class GatewayDevice {
message.put("msgId", tcpClient.getMsgId()); message.put("msgId", tcpClient.getMsgId());
params = new HashMap<>(); params = new HashMap<>();
params.put("devTid", DEV_TID); params.put("devTid", DEV_TID);
params.put("prodKey", PROD_KEY);
message.put("params", params); message.put("params", params);
tcpClient.send(JSON.toJSONString(message) + "\n"); tcpClient.send(JSON.toJSONString(message) + "\n");
String respMsg = tcpClient.receive(MAX_WAIT_TIME_MILLS); String respMsg = tcpClient.receive(MAX_WAIT_TIME_MILLS);
System.out.println(respMsg);
JSONObject respMap = JSON.parseObject(respMsg); JSONObject respMap = JSON.parseObject(respMsg);
boolean success = respMap.getIntValue("code") == 200; boolean success = respMap.getIntValue("code") == 200;
if (!success) { if (!success) {
...@@ -50,7 +54,7 @@ public class GatewayDevice { ...@@ -50,7 +54,7 @@ public class GatewayDevice {
} }
// 获取 randomKey // 获取 randomKey
randomKey = (String) respMap.get("randomKey"); randomKey = (String) respMap.getJSONObject("params").get("randomKey");
} }
// 网关登录 // 网关登录
...@@ -58,6 +62,7 @@ public class GatewayDevice { ...@@ -58,6 +62,7 @@ public class GatewayDevice {
params.put("devTid", DEV_TID); params.put("devTid", DEV_TID);
// 如果启用两步认证,需要加上authKey // 如果启用两步认证,需要加上authKey
if (GATEWAY_DEVICE_TWO_FACTOR_AUTHENTICATION) { if (GATEWAY_DEVICE_TWO_FACTOR_AUTHENTICATION) {
System.out.println("randomKey:" + randomKey);
String authKey = Util.MD5(randomKey + DEV_TID + PRIV_KEY); String authKey = Util.MD5(randomKey + DEV_TID + PRIV_KEY);
params.put("authKey", authKey); params.put("authKey", authKey);
} }
...@@ -67,6 +72,7 @@ public class GatewayDevice { ...@@ -67,6 +72,7 @@ public class GatewayDevice {
message.put("params", params); message.put("params", params);
tcpClient.send(JSON.toJSONString(message) + "\n"); tcpClient.send(JSON.toJSONString(message) + "\n");
String respMsg = tcpClient.receive(MAX_WAIT_TIME_MILLS); String respMsg = tcpClient.receive(MAX_WAIT_TIME_MILLS);
System.out.println(respMsg);
JSONObject resp = JSON.parseObject(respMsg); JSONObject resp = JSON.parseObject(respMsg);
if (resp.getIntValue("code") != 200) { if (resp.getIntValue("code") != 200) {
System.out.println("gateway login failed,resp:" + respMsg); System.out.println("gateway login failed,resp:" + respMsg);
...@@ -74,12 +80,14 @@ public class GatewayDevice { ...@@ -74,12 +80,14 @@ public class GatewayDevice {
} }
// 登录成功,这里订阅消息 // 登录成功,这里订阅消息
subcribeMessage(tcpClient); subscribeMessage(tcpClient);
// TODO 发送业务数据 // TODO 发送业务数据
// 循环定时发送心跳 // 循环定时发送心跳
loopSendHeartbeat(tcpClient); loopSendHeartbeat(tcpClient);
countDownLatch.await();
} }
private static void loopSendHeartbeat(TcpClient tcpClient) { private static void loopSendHeartbeat(TcpClient tcpClient) {
...@@ -91,7 +99,7 @@ public class GatewayDevice { ...@@ -91,7 +99,7 @@ public class GatewayDevice {
heartbeatMessage.put("action", "heartbeat"); heartbeatMessage.put("action", "heartbeat");
heartbeatMessage.put("msgId", tcpClient.getMsgId()); heartbeatMessage.put("msgId", tcpClient.getMsgId());
try { try {
tcpClient.send(JSON.toJSONString(heartbeatMessage)); tcpClient.send(JSON.toJSONString(heartbeatMessage) + "\n");
} catch (IOException e) { } catch (IOException e) {
System.out.println(e.getMessage()); System.out.println(e.getMessage());
} }
...@@ -100,10 +108,12 @@ public class GatewayDevice { ...@@ -100,10 +108,12 @@ public class GatewayDevice {
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
} }
System.out.println("tcp client closed");
}); });
} }
private static void subcribeMessage(TcpClient tcpClient) { private static void subscribeMessage(TcpClient tcpClient) {
EXECUTOR.execute( EXECUTOR.execute(
new ReceiveMessageTask( new ReceiveMessageTask(
tcpClient, tcpClient,
......
...@@ -14,7 +14,6 @@ import java.util.concurrent.TimeoutException; ...@@ -14,7 +14,6 @@ import java.util.concurrent.TimeoutException;
/** 描述 Created by du on 2017/8/6. */ /** 描述 Created by du on 2017/8/6. */
public class TcpClient { public class TcpClient {
private static final int TIMEOUT_MILL = 20000;
private Socket client; private Socket client;
private String host; private String host;
private int port; private int port;
...@@ -46,7 +45,7 @@ public class TcpClient { ...@@ -46,7 +45,7 @@ public class TcpClient {
public TcpClient connect(int timeout) throws IOException { public TcpClient connect(int timeout) throws IOException {
client.setKeepAlive(true); client.setKeepAlive(true);
client.setSoTimeout(TIMEOUT_MILL); client.setSoTimeout(0);
client.connect(new InetSocketAddress(host, port), timeout); client.connect(new InetSocketAddress(host, port), timeout);
outToServer = new DataOutputStream(client.getOutputStream()); outToServer = new DataOutputStream(client.getOutputStream());
...@@ -62,6 +61,7 @@ public class TcpClient { ...@@ -62,6 +61,7 @@ public class TcpClient {
throw new RuntimeException("output channel closed"); throw new RuntimeException("output channel closed");
} }
System.out.println("SEND => " + message);
outToServer.writeBytes(message); outToServer.writeBytes(message);
return this; return this;
} }
...@@ -107,7 +107,8 @@ public class TcpClient { ...@@ -107,7 +107,8 @@ public class TcpClient {
*/ */
public String receive(int timeoutMill) public String receive(int timeoutMill)
throws InterruptedException, ExecutionException, TimeoutException { throws InterruptedException, ExecutionException, TimeoutException {
return new FutureTask<String>(() -> inFromServer.readLine()) FutureTask<String> futureTask = new FutureTask<>(() -> inFromServer.readLine());
.get(timeoutMill, TimeUnit.MICROSECONDS); futureTask.run();
return futureTask.get(timeoutMill, TimeUnit.MILLISECONDS);
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment