内置
飞企标准协议(直连设备)
直连设备接入,标准数据格式无缝进行编码、解码操作,无需转换。
HTTP 协议实现
public class StandardHttpProtocolCodec implements ProtocolCodec {
@Override
public String getTransport() {
return DefaultTransport.HTTP.getCode();
}
/**
* 解码
* 上行数据直接转化为标准数据结构
* @param decoderCodecContext
* @return
*/
@Override
public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
HttpPayload httpPayload = (HttpPayload)decoderCodecContext.getPayload();
JSONObject param = decoderCodecContext.getCodecMetadataParam();
// 根据数据来源进行对应的数据解析
if(replyMessage instanceof DeviceReplyMessage){
DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), DeviceMetadata.class);
deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
}else if(replyMessage instanceof EventReplyMessage){
EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), EventMetadata.class);
eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
}else if(replyMessage instanceof FuncReplyMessage){
FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), FuncReplyMessage.class);
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
return funcReplyMessage;
}else {
// 其他来源,不处理直接返回
}
return replyMessage;
}
/**
* 编码
* 目前不支持HTTP的下行方式
* @param encoderCodecContext
* @param encoderCallback
* @return
*/
@Override
public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
// http不支持系统调度下行,所以状态固定为“完成”
funcReplyMessage.setState(FuncStatusType.TO_BE_RETURNED.getCode());
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
return new HttpIssue(funcReplyMessage);
}
}
MQTT 协议实现
public class StandardMqttProtocolCodec implements ProtocolCodec {
@Override
public String getTransport() {
return DefaultTransport.MQTT.getCode();
}
/**
* 解码
* 上行数据直接转化为标准数据结构
* @param decoderCodecContext
* @return
*/
@Override
public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
MqttPayload mqttPayload = (MqttPayload)decoderCodecContext.getPayload();
JSONObject param = decoderCodecContext.getCodecMetadataParam();
// 根据数据来源进行对应的数据解析
if(replyMessage instanceof DeviceReplyMessage){
DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), DeviceMetadata.class);
deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
}else if(replyMessage instanceof EventReplyMessage){
EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), EventMetadata.class);
eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
}else if(replyMessage instanceof FuncReplyMessage){
FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), FuncReplyMessage.class);
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
return funcReplyMessage;
}else {
// 其他来源,不处理直接返回
}
return replyMessage;
}
@Override
public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
// MQTT支持系统调度下行,所以状态固定为“待发送”
funcReplyMessage.setState(FuncStatusType.TO_BE_SENT.getCode());
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
return new MqttIssue(funcReplyMessage);
}
}
注册器
public class StandardProtocolRegister implements ProtocolRegister {
/**
* MQTT 配置信息
*/
private static final CodecMetadata mqttConfig = new CodecMetadata(DefaultTransport.MQTT.getCode(),"MQTT配置", "无需配置认证参数")
.addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);
/**
* HTTP 配置信息
*/
private static final CodecMetadata httpConfig = new CodecMetadata(DefaultTransport.HTTP.getCode(),"HTTP配置", "目前只支持上行,暂不支持下行")
.addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);
@Override
public ProtocolContainer register() {
ProtocolContainer protocolRegister = new ProtocolContainer("StandardProtocolRegister","标准协议","标准协议支持MQTT、HTTP,其中HTTP只支持上行");
// MQTT 注册
{
protocolRegister.addCodecMetadata(mqttConfig)
.addProtocolCodec(new StandardMqttProtocolCodec());
}
// HTTP 注册
{
protocolRegister.addCodecMetadata(httpConfig)
.addProtocolCodec(new StandardHttpProtocolCodec());
}
return protocolRegister;
}
}
飞企边缘网关(网关设备)
边缘网关接入,通过网关编排引擎进行标准数据格式的转换,同时提供直接进入网关配置页面的入口。
HTTP 协议实现
public class EdgeBoxHttpProtocolCodec implements ProtocolCodec {
@Override
public String getTransport() {
return DefaultTransport.HTTP.getCode();
}
/**
* 解码
* 上行数据直接转化为标准数据结构
* @param decoderCodecContext
* @return
*/
@Override
public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
HttpPayload httpPayload = (HttpPayload)decoderCodecContext.getPayload();
JSONObject param = decoderCodecContext.getCodecMetadataParam();
// 根据数据来源进行对应的数据解析
if(replyMessage instanceof DeviceReplyMessage){
DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), DeviceMetadata.class);
deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
}else if(replyMessage instanceof EventReplyMessage){
EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), EventMetadata.class);
eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
}else if(replyMessage instanceof FuncReplyMessage){
FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), FuncReplyMessage.class);
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
return funcReplyMessage;
}else {
// 其他来源,不处理直接返回
}
return replyMessage;
}
/**
* 编码
* 标准数据结构直接转化未下行指令
* @param encoderCodecContext
* @param encoderCallback
* @return
*/
@Override
public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
JSONObject param = encoderCodecContext.getCodecMetadataParam();
// http不支持系统调度下行,所以状态固定为“完成”
funcReplyMessage.setState(FuncStatusType.TO_BE_RETURNED.getCode());
// 获取边缘网关服务单点登录地址
if(SubsystemFunc.PLATFORM_URL.match(funcReplyMessage.getFunc())){
String host = param.getStr("host");
funcReplyMessage.setRes(host+"?access_token="+EdgeBoxUtil.getToken(param.getStr("username"),param.getStr("password"),host));
}else{
}
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
return new HttpIssue(funcReplyMessage);
}
}
MQTT 协议实现
public class EdgeBoxMqttProtocolCodec implements ProtocolCodec {
@Override
public String getTransport() {
return DefaultTransport.MQTT.getCode();
}
/**
* 解码
* 上行数据直接转化为标准数据结构
* @param decoderCodecContext
* @return
*/
@Override
public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
MqttPayload mqttPayload = (MqttPayload)decoderCodecContext.getPayload();
JSONObject param = decoderCodecContext.getCodecMetadataParam();
// 根据数据来源进行对应的数据解析
if(replyMessage instanceof DeviceReplyMessage){
DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), DeviceMetadata.class);
deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
}else if(replyMessage instanceof EventReplyMessage){
EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), EventMetadata.class);
eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
}else if(replyMessage instanceof FuncReplyMessage){
FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), FuncReplyMessage.class);
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
return funcReplyMessage;
}else {
// 其他来源,不处理直接返回
}
return replyMessage;
}
/**
* 编码
* 标准数据结构直接转化未下行指令
* @param encoderCodecContext
* @param encoderCallback
* @return
*/
@Override
public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
// MQTT支持系统调度下行,所以状态固定为“待发送”
funcReplyMessage.setState(FuncStatusType.TO_BE_SENT.getCode());
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
return new MqttIssue(funcReplyMessage);
}
}
特殊工具类
public class EdgeBoxUtil {
/**
* 获取Token
* @param username
* @param password
* @param url
* @return
*/
public static String getToken(String username, String password,String url) {
JSONObject paramEntity= JSONUtil.createObj()
.set("client_id","node-red-admin")
.set("grant_type","password")
.set("scope","*")
.set("username",username)
.set("password",password);
String result = post(url+"/auth/token", paramEntity, "application/json");
if (JSONUtil.isJsonObj(result)) {
JSONObject jsonObject = JSONUtil.parseObj(result);
return jsonObject.getStr("access_token");
}
throw new BaseException(BaseErrors.ERR_1007,result);
}
/**
* POST 请求 第三方请求封装
* @param url
* @param body
* @param contentType
* @return
*/
public static String post(String url, JSONObject body, String contentType){
return HttpRequest.post(url)
.charset("UTF-8")
.body(body.toString(), contentType)
.execute().body();
}
}
注册器
public class EdgeBoxProtocolRegister implements ProtocolRegister {
/**
* MQTT 配置信息
*/
private static final CodecMetadata mqttConfig = new CodecMetadata(DefaultTransport.MQTT.getCode(),"MQTT配置", "无需配置认证参数")
.addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);
/**
* HTTP 配置信息
*/
private static final CodecMetadata httpConfig = new CodecMetadata(DefaultTransport.HTTP.getCode(),"HTTP配置", "目前只支持上行,暂不支持下行")
.addProperty("username","账号","边缘网关账号")
.addProperty("password","密码","边缘网关密码")
.addProperty("host","网关服务地址","网关服务地址")
.addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);
@Override
public ProtocolContainer register() {
ProtocolContainer protocolRegister = new ProtocolContainer("EdgeBoxProtocolRegister","标准边缘网关协议","边缘网关协议支持MQTT、HTTP,其中HTTP只支持上行");
// MQTT 注册
{
protocolRegister.addCodecMetadata(mqttConfig)
.addProtocolCodec(new EdgeBoxMqttProtocolCodec());
}
// HTTP 注册
{
protocolRegister.addCodecMetadata(httpConfig)
.addProtocolCodec(new EdgeBoxHttpProtocolCodec());
}
return protocolRegister;
}
}
飞企物联平台(子系统设备)
飞企物联平台接入,通过实现标准编解码协议接口,完成不同通信协议的编码、解码操作,同时提供设备信息、告警信息等数据获取的功能实现。
设备上报数据格式:参考飞企物流平台设备上报数据格式”
告警上报数据格式:参考飞企物流平台告警上报数据格式”
HTTP 协议实现
public class JetlinksHttpProtocolCodec implements ProtocolCodec {
@Override
public String getTransport() {
return DefaultTransport.HTTP.getCode();
}
/**
* 解码
* 上行数据直接转化为标准数据结构
* @param decoderCodecContext
* @return
*/
@Override
public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
HttpPayload httpPayload = (HttpPayload)decoderCodecContext.getPayload();
// 根据数据来源进行对应的数据解析
if(replyMessage instanceof DeviceReplyMessage){
return ((DeviceReplyMessage) replyMessage).addList(JetlinksUtil.decoderDeviceReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(httpPayload.getPayload())));
}else if(replyMessage instanceof EventReplyMessage){
return ((EventReplyMessage) replyMessage).addList(JetlinksUtil.decoderEventReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(httpPayload.getPayload())));
}else if(replyMessage instanceof FuncReplyMessage){
// 暂不处理
}else {
// 其他来源,不处理直接返回
}
return replyMessage;
}
/**
* 编码
* 标准数据结构直接转化未下行指令
* @param encoderCodecContext
* @param encoderCallback
* @return
*/
@Override
public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
JSONObject param = encoderCodecContext.getCodecMetadataParam();
// 实现HTTP协议模式下的默认功能
if(SubsystemFunc.GET_DEVICE_LIST.match(funcReplyMessage.getFunc())){
// 获取设备列表并返回设备列表,使用回调函数进行推送 encoderCallback.reply(new DeviceReplyMessage())
encoderCallback.reply(encoderCodecContext,new DeviceReplyMessage().addList(JetlinksUtil.getDeviceList(param,funcReplyMessage.getReq())));
}else if (SubsystemFunc.GET_DEVICE_INFO.match(funcReplyMessage.getFunc())){
// 获取设备信息并返回设备列表,使用回调函数进行推送 encoderCallback.reply(new DeviceReplyMessage())
encoderCallback.reply(encoderCodecContext,new DeviceReplyMessage().addDeviceMetadata(JetlinksUtil.getDeviceInfo(param,funcReplyMessage.getReq())));
}else if (SubsystemFunc.GET_EVENT_LIST.match(funcReplyMessage.getFunc())){
// 获取告警列表并返回告警列表,使用回调函数进行推送 encoderCallback.reply(new EventReplyMessage())
encoderCallback.reply(encoderCodecContext,new EventReplyMessage().addList(JetlinksUtil.getEventList(param,funcReplyMessage.getReq())));
}else if (SubsystemFunc.PLATFORM_URL.match(funcReplyMessage.getFunc())){
// 获取单点登录地址
funcReplyMessage.setRes(param.getStr("platHost"));
}else if (SubsystemFunc.VIDEO_STREAM_URL.match(funcReplyMessage.getFunc())){
// 获取视频流地址支持HLS协议
}else{
// 其他功能操作在此进行延续...
}
// http下行调度,状态根据http调用结果设置为“完成”或者“失败”
funcReplyMessage.setState(FuncStatusType.TO_BE_RETURNED.getCode());
return new HttpIssue(funcReplyMessage);
}
}
MQTT 协议实现
public class JetlinksMqttProtocolCodec implements ProtocolCodec {
@Override
public String getTransport() {
return DefaultTransport.MQTT.getCode();
}
/**
* 解码
* 上行数据直接转化为标准数据结构
* @param decoderCodecContext
* @return
*/
@Override
public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
MqttPayload mqttPayload = (MqttPayload)decoderCodecContext.getPayload();
// 根据数据来源进行对应的数据解析
if(replyMessage instanceof DeviceReplyMessage){
return ((DeviceReplyMessage) replyMessage).addList(JetlinksUtil.decoderDeviceReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(mqttPayload.getPayload())));
}else if(replyMessage instanceof EventReplyMessage){
return ((EventReplyMessage) replyMessage).addList(JetlinksUtil.decoderEventReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(mqttPayload.getPayload())));
}else if(replyMessage instanceof FuncReplyMessage){
// 暂不处理
}else {
// 其他来源,不处理直接返回
}
return replyMessage;
}
/**
* 编码
* 标准数据结构直接转化未下行指令
* @param encoderCodecContext
* @param encoderCallback
* @return
*/
@Override
public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
// MQTT支持系统调度下行,所以状态固定为“待发送”
funcReplyMessage.setState(FuncStatusType.TO_BE_SENT.getCode());
funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
return new MqttIssue(funcReplyMessage);
}
}
特殊工具类
public class JetlinksUtil {
/**
* POST 请求
* @param secureKey
* @param clientId
* @param url
* @param body
* @param contentType
* @return
*/
public static String post(String secureKey,String clientId,String url,JSONObject body,String contentType){
String timestamp = String.valueOf(System.currentTimeMillis());
String x_sign = DigestUtils.md5Hex(body.toString()+timestamp+secureKey);
return HttpRequest.post(url)
.header("X-Timestamp",timestamp,false)
.header("X-Client-Id",clientId,false)
.header("X-Sign",x_sign,false)
.charset("UTF-8")
.body(body.toString(), contentType)
.execute().body();
}
/**
* GET 请求
* @param secureKey
* @param clientId
* @param url
* @param formMap
* @return
*/
public static String getForm(String secureKey, String clientId, String url, Map<String, Object> formMap){
StringBuffer data = new StringBuffer();
Iterator<Map.Entry<String, Object>> iterator = formMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
data.append("&").append(entry.getKey()).append("=").append(entry.getValue());
}
String timestamp = String.valueOf(System.currentTimeMillis());
String x_sign = DigestUtils.md5Hex(data.toString().replaceFirst("&","")+timestamp+secureKey);
return HttpRequest.get(url)
.header("X-Timestamp",timestamp,false)
.header("X-Client-Id",clientId,false)
.header("X-Sign",x_sign,false)
.form(formMap)
.execute().body();
}
/**
* 解析设备告警类型
* 原数据: 物联平台告警名称
* 解析数据:EventType
* @param eventName
* @return
*/
public static String decoderEventType(String eventName){
EventType[] enums = EventType.values();
for (EventType enum_data : enums) {
if (eventName.contains(enum_data.getDesc())) {
return enum_data.getCode();
}
}
return EventType.OTHER_ALARM.getCode();
}
/**
* 解析设备状态
* 原数据: {text=未启用, value=notActive},{text=离线, value=offline},{text=在线, value=online}
* 解析数据:StatusDevice
* @param state
* @return
*/
public static Integer decoderDeviceState(String state){
if("notActive".equals(state))return DeviceStatusType.NOT_ENABLED.getCode();
if("offline".equals(state))return DeviceStatusType.OFFLINE.getCode();
// 默认正常
return DeviceStatusType.ONLINE.getCode();
}
/**
* 解析设备类型
* 原数据: {text=摄像头, value=0},{text=电表, value=1},{text=水表, value=2}
* 解析数据:DeviceType
* @param type
* @return
*/
public static String decoderDeviceType(String type){
// 解析
// 默认未知设备
return type;
}
/**
* 设备信息解码
* @param data 推送数据(原数据JSON格式)
* @return 解析结果
*/
public static List<DeviceMetadata> decoderDeviceReplyMessage(JSONObject param,JSONObject data){
List<DeviceMetadata> list = new ArrayList<>();
JSONArray array = JSONUtil.createArray();
if(JSONUtil.isJsonArray(JSONUtil.toJsonStr(data))){
array = JSONUtil.parseArray(data);
}else{
array.set(data);
}
Integer total = array.size();
for (int i=0;i<total;i++){
JSONObject obj = array.getJSONObject(i);
String deviceId = obj.getStr("deviceId");
if(StrUtil.isNotBlank(deviceId)){
DeviceMetadata device = new DeviceMetadata();
device.setId(ProtocolUtil.setPrefix(param,deviceId));
device.setProperties(obj.getJSONObject("properties"));
list.add(device);
}
}
return list;
}
/**
* 事件信息解码
* @param data 推送数据(原数据JSON格式)
* @return 解析结果
*/
public static List<EventMetadata> decoderEventReplyMessage(JSONObject param,JSONObject data){
List<EventMetadata> list = new ArrayList<>();
JSONArray array = JSONUtil.createArray();
if(JSONUtil.isJsonArray(JSONUtil.toJsonStr(data))){
array = JSONUtil.parseArray(data);
}else{
array.set(data);
}
Integer total = array.size();
for (int i=0;i<total;i++){
JSONObject obj = array.getJSONObject(i);
String eventId = obj.getStr("alarmId");
if(StrUtil.isNotBlank(eventId)){
EventMetadata event = new EventMetadata();
event.setId(ProtocolUtil.setPrefix(param,eventId));
event.setDeviceId(ProtocolUtil.setPrefix(param,obj.getStr("deviceId")));
event.setName(obj.getStr("alarmName"));
event.setType(decoderEventType(event.getName()));
event.setCreateTime(obj.getDate("alarmData"));
event.setProperties(obj.getJSONObject("properties"));
list.add(event);
}
}
return list;
}
/**
* 获取设备列表
* @param req 请求参数
* @return 解析结果
*/
public static List<DeviceMetadata> getDeviceList(JSONObject param,String req){
// JSONObject reqObject = JSONUtil.parseObj(req);
// 待续:根据deviceType,pageIndex,pageSize条件,通过param提供的南向平台信息,获取南向平台的设备列表数据
// 查询所有设备(不进行分页)
JSONObject queryParamEntity=JSONUtil.createObj().set("paging","true");
String result = post(param.getStr("secureKey"),param.getStr("clientId"),param.getStr("apiHost")+"/api/v1/device/_query",queryParamEntity,"application/json");
if (JSONUtil.isJsonObj(result)) {
JSONObject jsonObject = JSONUtil.parseObj(result);
Integer code = jsonObject.getInt("status");
if (code == 200) {
JSONObject resultData = jsonObject.getJSONObject("result");
Integer total = resultData.getInt("total");
JSONArray array = resultData.getJSONArray("data");
if(total != array.size()) total = array.size();
List<DeviceMetadata> list = new ArrayList<>();
for (int i = 0; i < total; i++) {
JSONObject device = array.getJSONObject(i);
DeviceMetadata deviceEntity = new DeviceMetadata();
deviceEntity.setId(ProtocolUtil.setPrefix(param,device.getStr("id")));
deviceEntity.setName(device.getStr("name"));
deviceEntity.setClassify(decoderDeviceType(device.getStr("productId")));
deviceEntity.setState(decoderDeviceState(device.getStr("state")));
list.add(deviceEntity);
}
return list;
}
}
throw new BaseException(BaseErrors.ERR_1007,result);
}
/**
* 获取设备属性值
* @param req 请求参数
* @return 解析结果
*/
public static DeviceMetadata getDeviceInfo(JSONObject param,String req){
DeviceMetadata device = new DeviceMetadata();
// JSONObject reqObject = JSONUtil.parseObj(req);
// 根据device条件,通过param提供的南向平台信息,获取南向平台的设备详情数据
String id = param.getStr("deviceId");
Map<String, Object> formMap = new HashMap<>();
formMap.put("deviceId",ProtocolUtil.removePrefix(param,id));
String result = getForm(param.getStr("secureKey"),param.getStr("clientId"),param.getStr("apiHost")+ StrUtil.format("/api/v1/device/{}/properties/_latest",id),formMap);
if (JSONUtil.isJsonObj(result)) {
JSONObject jsonObject = JSONUtil.parseObj(result);
Integer code = jsonObject.getInt("status");
if (code == 200) {
JSONArray array = jsonObject.getJSONArray("result");
JSONObject properties = new JSONObject();
int total = array.size();
for (int i = 0; i < total; i++) {
JSONObject attribute = array.getJSONObject(i);
properties.set(attribute.getStr("property"),attribute.getStr("value"));
}
device.setId(id);
device.setProperties(properties);
return device;
}
}
throw new BaseException(BaseErrors.ERR_1007,result);
}
/**
* 获取告警列表
* @param req 请求参数
* @return 解析结果
*/
public static List<EventMetadata> getEventList(JSONObject param,String req){
Integer pageIndex = 1;
Integer pageSize = 20;
if(JSONUtil.isJsonObj(req)){
JSONObject reqObject = JSONUtil.parseObj(req);
pageIndex = reqObject.getInt("pageIndex");
pageSize = reqObject.getInt("pageSize");
}
// 根据eventType,pageIndex,pageSize条件,通过param提供的南向平台信息,获取南向平台的最新告警列表数据
JSONObject queryParamEntity=JSONUtil.createObj()
.set("pageIndex",pageIndex)
.set("pageSize",pageSize)
.set("orderBy","alarmTime desc ");
String result = post(param.getStr("secureKey"),param.getStr("clientId"),param.getStr("apiHost")+"/device/alarm/history/_query",queryParamEntity,"application/json");
if (JSONUtil.isJsonObj(result)) {
JSONObject jsonObject = JSONUtil.parseObj(result);
Integer code = jsonObject.getInt("status");
if (code == 200) {
JSONObject resultData = jsonObject.getJSONObject("result");
Integer total = resultData.getInt("total");
JSONArray array = resultData.getJSONArray("data");
if(total != array.size()) total = array.size();
List<EventMetadata> list = new ArrayList<>();
for (int i = 0; i < total; i++) {
JSONObject event = array.getJSONObject(i);
EventMetadata eventEntity = new EventMetadata();
eventEntity.setId(ProtocolUtil.setPrefix(param,event.getStr("id")));
eventEntity.setName(event.getStr("alarmName"));
eventEntity.setDeviceId(ProtocolUtil.setPrefix(param,event.getStr("deviceId")));
eventEntity.setType(decoderEventType(eventEntity.getName()));
eventEntity.setDetails(event.getStr("alarmData"));
eventEntity.setCreateTime(event.getDate("alarmTime"));
eventEntity.setState(EventStatusType.PENDING.getCode());
list.add(eventEntity);
}
return list;
}
}
throw new BaseException(BaseErrors.ERR_1007,result);
}
}
注册器
public class JetlinksProtocolRegister implements ProtocolRegister {
/**
* MQTT 配置信息
*/
private static final CodecMetadata mqttConfig = new CodecMetadata(DefaultTransport.MQTT.getCode(),"MQTT配置", "无需配置认证参数")
.addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);
/**
* HTTP 配置信息
*/
private static final CodecMetadata httpConfig = new CodecMetadata(DefaultTransport.HTTP.getCode(),"HTTP配置", "根据jetlinks提供的认证信息,进行http访问调用")
.addProperty("clientId","授权clientId","Jetlinks物联平台clientId")
.addProperty("secureKey","授权secureKey","Jetlinks物联平台secureKey")
.addProperty("apiHost","接口服务地址","Jetlinks物联平台接口服务地址")
.addProperty("platHost","平台地址","Jetlinks物联平台地址")
.addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false)
.addFuncProperty(SubsystemFunc.GET_DEVICE_LIST.getCode(),SubsystemFunc.GET_DEVICE_LIST.getDesc(),"获取设备列表","无参数")
.addFuncProperty(SubsystemFunc.GET_DEVICE_INFO.getCode(),SubsystemFunc.GET_DEVICE_INFO.getDesc(),"获取设备信息","参数deviceId不能为空")
.addFuncProperty(SubsystemFunc.GET_EVENT_LIST.getCode(),SubsystemFunc.GET_EVENT_LIST.getDesc(),"获取告警记录","参数pageIndex默认1、pageSize默认20,非必填")
.addFuncProperty(SubsystemFunc.PLATFORM_URL.getCode(),SubsystemFunc.PLATFORM_URL.getDesc(),"平台地址","无参数");
@Override
public ProtocolContainer register() {
ProtocolContainer protocolRegister = new ProtocolContainer("JetlinksProtocolRegister","Jetlinks协议","Jetlinks协议支持MQTT、HTTP");
// MQTT 注册
{
protocolRegister.addCodecMetadata(mqttConfig)
.addProtocolCodec(new JetlinksMqttProtocolCodec());
}
// HTTP 注册
{
protocolRegister.addCodecMetadata(httpConfig)
.addProtocolCodec(new JetlinksHttpProtocolCodec());
}
return protocolRegister;
}
}
文档更新时间: 2023-09-11 14:26 作者:管理员