内置

飞企标准协议(直连设备)

直连设备接入,标准数据格式无缝进行编码、解码操作,无需转换。

HTTP 协议实现

public class StandardHttpProtocolCodec implements ProtocolCodec {
    @Override
    public String getTransport() {
        return DefaultTransport.HTTP.getCode();
    }

    /**
     * 解码
     * 上行数据直接转化为标准数据结构
     * @param decoderCodecContext
     * @return
     */
    @Override
    public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
        ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
        HttpPayload httpPayload = (HttpPayload)decoderCodecContext.getPayload();
        JSONObject param = decoderCodecContext.getCodecMetadataParam();

        // 根据数据来源进行对应的数据解析
        if(replyMessage instanceof DeviceReplyMessage){
            DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), DeviceMetadata.class);
            deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
            return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
        }else if(replyMessage instanceof EventReplyMessage){
            EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), EventMetadata.class);
            eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
            eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
            return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
        }else if(replyMessage instanceof FuncReplyMessage){
            FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), FuncReplyMessage.class);
            funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
            return funcReplyMessage;
        }else {
            // 其他来源,不处理直接返回
        }
        return replyMessage;
    }

    /**
     * 编码
     * 目前不支持HTTP的下行方式
     * @param encoderCodecContext
     * @param encoderCallback
     * @return
     */
    @Override
    public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
        FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
        // http不支持系统调度下行,所以状态固定为“完成”
        funcReplyMessage.setState(FuncStatusType.TO_BE_RETURNED.getCode());
        funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
        return new HttpIssue(funcReplyMessage);
    }

}

MQTT 协议实现

public class StandardMqttProtocolCodec implements ProtocolCodec {
    @Override
    public String getTransport() {
        return DefaultTransport.MQTT.getCode();
    }

    /**
     * 解码
     * 上行数据直接转化为标准数据结构
     * @param decoderCodecContext
     * @return
     */
    @Override
    public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
        ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
        MqttPayload mqttPayload = (MqttPayload)decoderCodecContext.getPayload();
        JSONObject param = decoderCodecContext.getCodecMetadataParam();

        // 根据数据来源进行对应的数据解析
        if(replyMessage instanceof DeviceReplyMessage){
            DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), DeviceMetadata.class);
            deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
            return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
        }else if(replyMessage instanceof EventReplyMessage){
            EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), EventMetadata.class);
            eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
            eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
            return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
        }else if(replyMessage instanceof FuncReplyMessage){
            FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), FuncReplyMessage.class);
            funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
            return funcReplyMessage;
        }else {
            // 其他来源,不处理直接返回
        }
        return replyMessage;
    }

    @Override
    public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
        FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
        // MQTT支持系统调度下行,所以状态固定为“待发送”
        funcReplyMessage.setState(FuncStatusType.TO_BE_SENT.getCode());
        funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
        return new MqttIssue(funcReplyMessage);
    }
}

注册器

public class StandardProtocolRegister implements ProtocolRegister {

    /**
     * MQTT 配置信息
     */
    private static final CodecMetadata mqttConfig = new CodecMetadata(DefaultTransport.MQTT.getCode(),"MQTT配置", "无需配置认证参数")
            .addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);

    /**
     * HTTP 配置信息
     */
    private static final CodecMetadata httpConfig = new CodecMetadata(DefaultTransport.HTTP.getCode(),"HTTP配置", "目前只支持上行,暂不支持下行")
            .addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);

    @Override
    public ProtocolContainer register() {

        ProtocolContainer protocolRegister = new ProtocolContainer("StandardProtocolRegister","标准协议","标准协议支持MQTT、HTTP,其中HTTP只支持上行");

        // MQTT 注册
        {
            protocolRegister.addCodecMetadata(mqttConfig)
                    .addProtocolCodec(new StandardMqttProtocolCodec());
        }

        // HTTP 注册
        {
            protocolRegister.addCodecMetadata(httpConfig)
                    .addProtocolCodec(new StandardHttpProtocolCodec());
        }

        return protocolRegister;
    }

}

飞企边缘网关(网关设备)

边缘网关接入,通过网关编排引擎进行标准数据格式的转换,同时提供直接进入网关配置页面的入口。

HTTP 协议实现

public class EdgeBoxHttpProtocolCodec implements ProtocolCodec {
    @Override
    public String getTransport() {
        return DefaultTransport.HTTP.getCode();
    }

    /**
     * 解码
     * 上行数据直接转化为标准数据结构
     * @param decoderCodecContext
     * @return
     */
    @Override
    public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
        ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
        HttpPayload httpPayload = (HttpPayload)decoderCodecContext.getPayload();
        JSONObject param = decoderCodecContext.getCodecMetadataParam();

        // 根据数据来源进行对应的数据解析
        if(replyMessage instanceof DeviceReplyMessage){
            DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), DeviceMetadata.class);
            deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
            return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
        }else if(replyMessage instanceof EventReplyMessage){
            EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), EventMetadata.class);
            eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
            eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
            return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
        }else if(replyMessage instanceof FuncReplyMessage){
            FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(httpPayload.getPayload()), FuncReplyMessage.class);
            funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
            return funcReplyMessage;
        }else {
            // 其他来源,不处理直接返回
        }
        return replyMessage;
    }

    /**
     * 编码
     * 标准数据结构直接转化未下行指令
     * @param encoderCodecContext
     * @param encoderCallback
     * @return
     */
    @Override
    public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
        FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
        JSONObject param = encoderCodecContext.getCodecMetadataParam();
        // http不支持系统调度下行,所以状态固定为“完成”
        funcReplyMessage.setState(FuncStatusType.TO_BE_RETURNED.getCode());
        // 获取边缘网关服务单点登录地址
        if(SubsystemFunc.PLATFORM_URL.match(funcReplyMessage.getFunc())){
            String host = param.getStr("host");
            funcReplyMessage.setRes(host+"?access_token="+EdgeBoxUtil.getToken(param.getStr("username"),param.getStr("password"),host));
        }else{

        }
        funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
        return new HttpIssue(funcReplyMessage);
    }
}

MQTT 协议实现

public class EdgeBoxMqttProtocolCodec implements ProtocolCodec {
    @Override
    public String getTransport() {
        return DefaultTransport.MQTT.getCode();
    }

    /**
     * 解码
     * 上行数据直接转化为标准数据结构
     * @param decoderCodecContext
     * @return
     */
    @Override
    public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
        ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
        MqttPayload mqttPayload = (MqttPayload)decoderCodecContext.getPayload();
        JSONObject param = decoderCodecContext.getCodecMetadataParam();

        // 根据数据来源进行对应的数据解析
        if(replyMessage instanceof DeviceReplyMessage){
            DeviceMetadata deviceMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), DeviceMetadata.class);
            deviceMetadata.setId(ProtocolUtil.setPrefix(param,deviceMetadata.getId()));
            return ((DeviceReplyMessage) replyMessage).addDeviceMetadata(deviceMetadata);
        }else if(replyMessage instanceof EventReplyMessage){
            EventMetadata eventMetadata = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), EventMetadata.class);
            eventMetadata.setId(ProtocolUtil.setPrefix(param,eventMetadata.getId()));
            eventMetadata.setDeviceId(ProtocolUtil.setPrefix(param,eventMetadata.getDeviceId()));
            return ((EventReplyMessage) replyMessage).addEventMetadata(eventMetadata);
        }else if(replyMessage instanceof FuncReplyMessage){
            FuncReplyMessage funcReplyMessage = JSONUtil.toBean(JSONUtil.parseObj(mqttPayload.getPayload()), FuncReplyMessage.class);
            funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(param,funcReplyMessage.getDeviceId()));
            return funcReplyMessage;
        }else {
            // 其他来源,不处理直接返回
        }
        return replyMessage;
    }

    /**
     * 编码
     * 标准数据结构直接转化未下行指令
     * @param encoderCodecContext
     * @param encoderCallback
     * @return
     */
    @Override
    public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
        FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
        // MQTT支持系统调度下行,所以状态固定为“待发送”
        funcReplyMessage.setState(FuncStatusType.TO_BE_SENT.getCode());
        funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
        return new MqttIssue(funcReplyMessage);
    }
}

特殊工具类

public class EdgeBoxUtil {

    /**
     * 获取Token
     * @param username
     * @param password
     * @param url
     * @return
     */
    public static String getToken(String username, String password,String url) {
        JSONObject paramEntity= JSONUtil.createObj()
                .set("client_id","node-red-admin")
                .set("grant_type","password")
                .set("scope","*")
                .set("username",username)
                .set("password",password);
        String result = post(url+"/auth/token", paramEntity, "application/json");
        if (JSONUtil.isJsonObj(result)) {
            JSONObject jsonObject = JSONUtil.parseObj(result);
            return jsonObject.getStr("access_token");
        }
        throw new BaseException(BaseErrors.ERR_1007,result);
    }

    /**
     * POST 请求 第三方请求封装
     * @param url
     * @param body
     * @param contentType
     * @return
     */
    public static String post(String url, JSONObject body, String contentType){
        return HttpRequest.post(url)
                .charset("UTF-8")
                .body(body.toString(), contentType)
                .execute().body();
    }

}

注册器

public class EdgeBoxProtocolRegister implements ProtocolRegister {

    /**
     * MQTT 配置信息
     */
    private static final CodecMetadata mqttConfig = new CodecMetadata(DefaultTransport.MQTT.getCode(),"MQTT配置", "无需配置认证参数")
            .addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);

    /**
     * HTTP 配置信息
     */
    private static final CodecMetadata httpConfig = new CodecMetadata(DefaultTransport.HTTP.getCode(),"HTTP配置", "目前只支持上行,暂不支持下行")
            .addProperty("username","账号","边缘网关账号")
            .addProperty("password","密码","边缘网关密码")
            .addProperty("host","网关服务地址","网关服务地址")
            .addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);

    @Override
    public ProtocolContainer register() {

        ProtocolContainer protocolRegister = new ProtocolContainer("EdgeBoxProtocolRegister","标准边缘网关协议","边缘网关协议支持MQTT、HTTP,其中HTTP只支持上行");

        // MQTT 注册
        {
            protocolRegister.addCodecMetadata(mqttConfig)
                    .addProtocolCodec(new EdgeBoxMqttProtocolCodec());
        }

        // HTTP 注册
        {
            protocolRegister.addCodecMetadata(httpConfig)
                    .addProtocolCodec(new EdgeBoxHttpProtocolCodec());
        }

        return protocolRegister;
    }

}

飞企物联平台(子系统设备)

飞企物联平台接入,通过实现标准编解码协议接口,完成不同通信协议的编码、解码操作,同时提供设备信息、告警信息等数据获取的功能实现。

  • 设备上报数据格式:参考飞企物流平台设备上报数据格式”

  • 告警上报数据格式:参考飞企物流平台告警上报数据格式”

HTTP 协议实现

public class JetlinksHttpProtocolCodec implements ProtocolCodec {
    @Override
    public String getTransport() {
        return DefaultTransport.HTTP.getCode();
    }

    /**
     * 解码
     * 上行数据直接转化为标准数据结构
     * @param decoderCodecContext
     * @return
     */
    @Override
    public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
        ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
        HttpPayload httpPayload = (HttpPayload)decoderCodecContext.getPayload();
        // 根据数据来源进行对应的数据解析
        if(replyMessage instanceof DeviceReplyMessage){
            return ((DeviceReplyMessage) replyMessage).addList(JetlinksUtil.decoderDeviceReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(httpPayload.getPayload())));
        }else if(replyMessage instanceof EventReplyMessage){
            return ((EventReplyMessage) replyMessage).addList(JetlinksUtil.decoderEventReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(httpPayload.getPayload())));
        }else if(replyMessage instanceof FuncReplyMessage){
            // 暂不处理
        }else {
            // 其他来源,不处理直接返回
        }
        return replyMessage;
    }

    /**
     * 编码
     * 标准数据结构直接转化未下行指令
     * @param encoderCodecContext
     * @param encoderCallback
     * @return
     */
    @Override
    public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
        FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
        JSONObject param = encoderCodecContext.getCodecMetadataParam();

        // 实现HTTP协议模式下的默认功能
        if(SubsystemFunc.GET_DEVICE_LIST.match(funcReplyMessage.getFunc())){
            // 获取设备列表并返回设备列表,使用回调函数进行推送 encoderCallback.reply(new DeviceReplyMessage())
            encoderCallback.reply(encoderCodecContext,new DeviceReplyMessage().addList(JetlinksUtil.getDeviceList(param,funcReplyMessage.getReq())));
        }else if (SubsystemFunc.GET_DEVICE_INFO.match(funcReplyMessage.getFunc())){
            // 获取设备信息并返回设备列表,使用回调函数进行推送 encoderCallback.reply(new DeviceReplyMessage())
            encoderCallback.reply(encoderCodecContext,new DeviceReplyMessage().addDeviceMetadata(JetlinksUtil.getDeviceInfo(param,funcReplyMessage.getReq())));
        }else if (SubsystemFunc.GET_EVENT_LIST.match(funcReplyMessage.getFunc())){
            // 获取告警列表并返回告警列表,使用回调函数进行推送 encoderCallback.reply(new EventReplyMessage())
            encoderCallback.reply(encoderCodecContext,new EventReplyMessage().addList(JetlinksUtil.getEventList(param,funcReplyMessage.getReq())));
        }else if (SubsystemFunc.PLATFORM_URL.match(funcReplyMessage.getFunc())){
            // 获取单点登录地址
            funcReplyMessage.setRes(param.getStr("platHost"));
        }else if (SubsystemFunc.VIDEO_STREAM_URL.match(funcReplyMessage.getFunc())){
            // 获取视频流地址支持HLS协议

        }else{
            // 其他功能操作在此进行延续...
        }
        // http下行调度,状态根据http调用结果设置为“完成”或者“失败”
        funcReplyMessage.setState(FuncStatusType.TO_BE_RETURNED.getCode());
        return new HttpIssue(funcReplyMessage);
    }

}

MQTT 协议实现

public class JetlinksMqttProtocolCodec implements ProtocolCodec {
    @Override
    public String getTransport() {
        return DefaultTransport.MQTT.getCode();
    }

    /**
     * 解码
     * 上行数据直接转化为标准数据结构
     * @param decoderCodecContext
     * @return
     */
    @Override
    public ReplyMessage decoder(DecoderCodecContext decoderCodecContext) {
        ReplyMessage replyMessage = decoderCodecContext.getReplyMessage();
        MqttPayload mqttPayload = (MqttPayload)decoderCodecContext.getPayload();

        // 根据数据来源进行对应的数据解析
        if(replyMessage instanceof DeviceReplyMessage){
            return ((DeviceReplyMessage) replyMessage).addList(JetlinksUtil.decoderDeviceReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(mqttPayload.getPayload())));
        }else if(replyMessage instanceof EventReplyMessage){
            return ((EventReplyMessage) replyMessage).addList(JetlinksUtil.decoderEventReplyMessage(decoderCodecContext.getCodecMetadataParam(),JSONUtil.parseObj(mqttPayload.getPayload())));
        }else if(replyMessage instanceof FuncReplyMessage){
            // 暂不处理
        }else {
            // 其他来源,不处理直接返回
        }
        return replyMessage;
    }

    /**
     * 编码
     * 标准数据结构直接转化未下行指令
     * @param encoderCodecContext
     * @param encoderCallback
     * @return
     */
    @Override
    public Issue encode(EncoderCodecContext encoderCodecContext, EncoderCallback encoderCallback) {
        FuncReplyMessage funcReplyMessage = encoderCodecContext.getFuncReplyMessage();
        // MQTT支持系统调度下行,所以状态固定为“待发送”
        funcReplyMessage.setState(FuncStatusType.TO_BE_SENT.getCode());
        funcReplyMessage.setDeviceId(ProtocolUtil.setPrefix(encoderCodecContext.getCodecMetadataParam(),funcReplyMessage.getDeviceId()));
        return new MqttIssue(funcReplyMessage);
    }

}

特殊工具类

public class JetlinksUtil {

    /**
     * POST 请求
     * @param secureKey
     * @param clientId
     * @param url
     * @param body
     * @param contentType
     * @return
     */
    public static String post(String secureKey,String clientId,String url,JSONObject body,String contentType){
        String timestamp = String.valueOf(System.currentTimeMillis());
        String x_sign = DigestUtils.md5Hex(body.toString()+timestamp+secureKey);
        return HttpRequest.post(url)
                .header("X-Timestamp",timestamp,false)
                .header("X-Client-Id",clientId,false)
                .header("X-Sign",x_sign,false)
                .charset("UTF-8")
                .body(body.toString(), contentType)
                .execute().body();
    }

    /**
     * GET 请求
     * @param secureKey
     * @param clientId
     * @param url
     * @param formMap
     * @return
     */
    public static String getForm(String secureKey, String clientId, String url, Map<String, Object> formMap){

        StringBuffer data = new StringBuffer();

        Iterator<Map.Entry<String, Object>> iterator = formMap.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Object> entry = iterator.next();
            data.append("&").append(entry.getKey()).append("=").append(entry.getValue());
        }

        String timestamp = String.valueOf(System.currentTimeMillis());
        String x_sign = DigestUtils.md5Hex(data.toString().replaceFirst("&","")+timestamp+secureKey);
        return HttpRequest.get(url)
                .header("X-Timestamp",timestamp,false)
                .header("X-Client-Id",clientId,false)
                .header("X-Sign",x_sign,false)
                .form(formMap)
                .execute().body();
    }

    /**
     * 解析设备告警类型
     * 原数据: 物联平台告警名称
     * 解析数据:EventType
     * @param eventName
     * @return
     */
    public static String decoderEventType(String eventName){
        EventType[] enums = EventType.values();
        for (EventType enum_data : enums) {
            if (eventName.contains(enum_data.getDesc())) {
                return enum_data.getCode();
            }
        }
        return EventType.OTHER_ALARM.getCode();
    }

    /**
     * 解析设备状态
     * 原数据: {text=未启用, value=notActive},{text=离线, value=offline},{text=在线, value=online}
     * 解析数据:StatusDevice
     * @param state
     * @return
     */
    public static Integer decoderDeviceState(String state){
        if("notActive".equals(state))return DeviceStatusType.NOT_ENABLED.getCode();
        if("offline".equals(state))return DeviceStatusType.OFFLINE.getCode();

        // 默认正常
        return DeviceStatusType.ONLINE.getCode();
    }

    /**
     * 解析设备类型
     * 原数据: {text=摄像头, value=0},{text=电表, value=1},{text=水表, value=2}
     * 解析数据:DeviceType
     * @param type
     * @return
     */
    public static String decoderDeviceType(String type){
        // 解析

        // 默认未知设备
        return type;
    }

    /**
     * 设备信息解码
     * @param data  推送数据(原数据JSON格式)
     * @return 解析结果
     */
    public static List<DeviceMetadata> decoderDeviceReplyMessage(JSONObject param,JSONObject data){

        List<DeviceMetadata> list = new ArrayList<>();

        JSONArray array = JSONUtil.createArray();
        if(JSONUtil.isJsonArray(JSONUtil.toJsonStr(data))){
            array = JSONUtil.parseArray(data);
        }else{
            array.set(data);
        }

        Integer total = array.size();
        for (int i=0;i<total;i++){
            JSONObject obj = array.getJSONObject(i);
            String deviceId = obj.getStr("deviceId");
            if(StrUtil.isNotBlank(deviceId)){
                DeviceMetadata device = new DeviceMetadata();
                device.setId(ProtocolUtil.setPrefix(param,deviceId));
                device.setProperties(obj.getJSONObject("properties"));
                list.add(device);
            }
        }
        return list;
    }


    /**
     * 事件信息解码
     * @param data  推送数据(原数据JSON格式)
     * @return 解析结果
     */
    public static List<EventMetadata> decoderEventReplyMessage(JSONObject param,JSONObject data){
        List<EventMetadata> list = new ArrayList<>();

        JSONArray array = JSONUtil.createArray();
        if(JSONUtil.isJsonArray(JSONUtil.toJsonStr(data))){
            array = JSONUtil.parseArray(data);
        }else{
            array.set(data);
        }

        Integer total = array.size();
        for (int i=0;i<total;i++){
            JSONObject obj = array.getJSONObject(i);
            String eventId = obj.getStr("alarmId");
            if(StrUtil.isNotBlank(eventId)){
                EventMetadata event = new EventMetadata();
                event.setId(ProtocolUtil.setPrefix(param,eventId));
                event.setDeviceId(ProtocolUtil.setPrefix(param,obj.getStr("deviceId")));
                event.setName(obj.getStr("alarmName"));
                event.setType(decoderEventType(event.getName()));
                event.setCreateTime(obj.getDate("alarmData"));
                event.setProperties(obj.getJSONObject("properties"));
                list.add(event);
            }
        }
        return list;
    }


    /**
     * 获取设备列表
     * @param req 请求参数
     * @return 解析结果
     */
    public static List<DeviceMetadata> getDeviceList(JSONObject param,String req){
//        JSONObject reqObject = JSONUtil.parseObj(req);
        // 待续:根据deviceType,pageIndex,pageSize条件,通过param提供的南向平台信息,获取南向平台的设备列表数据
        // 查询所有设备(不进行分页)
        JSONObject queryParamEntity=JSONUtil.createObj().set("paging","true");
        String result = post(param.getStr("secureKey"),param.getStr("clientId"),param.getStr("apiHost")+"/api/v1/device/_query",queryParamEntity,"application/json");
        if (JSONUtil.isJsonObj(result)) {
            JSONObject jsonObject = JSONUtil.parseObj(result);
            Integer code = jsonObject.getInt("status");
            if (code == 200) {
                JSONObject resultData = jsonObject.getJSONObject("result");
                Integer total = resultData.getInt("total");
                JSONArray array = resultData.getJSONArray("data");
                if(total != array.size()) total = array.size();

                List<DeviceMetadata> list = new ArrayList<>();

                for (int i = 0; i < total; i++) {
                    JSONObject device = array.getJSONObject(i);

                    DeviceMetadata deviceEntity = new DeviceMetadata();
                    deviceEntity.setId(ProtocolUtil.setPrefix(param,device.getStr("id")));
                    deviceEntity.setName(device.getStr("name"));
                    deviceEntity.setClassify(decoderDeviceType(device.getStr("productId")));
                    deviceEntity.setState(decoderDeviceState(device.getStr("state")));
                    list.add(deviceEntity);
                }
                return list;
            }
        }
        throw new BaseException(BaseErrors.ERR_1007,result);
    }


    /**
     * 获取设备属性值
     * @param req 请求参数
     * @return 解析结果
     */
    public static DeviceMetadata getDeviceInfo(JSONObject param,String req){
        DeviceMetadata device = new DeviceMetadata();
//        JSONObject reqObject = JSONUtil.parseObj(req);
        // 根据device条件,通过param提供的南向平台信息,获取南向平台的设备详情数据
        String id = param.getStr("deviceId");
        Map<String, Object> formMap = new HashMap<>();
        formMap.put("deviceId",ProtocolUtil.removePrefix(param,id));
        String result = getForm(param.getStr("secureKey"),param.getStr("clientId"),param.getStr("apiHost")+ StrUtil.format("/api/v1/device/{}/properties/_latest",id),formMap);
        if (JSONUtil.isJsonObj(result)) {
            JSONObject jsonObject = JSONUtil.parseObj(result);
            Integer code = jsonObject.getInt("status");
            if (code == 200) {
                JSONArray array = jsonObject.getJSONArray("result");

                JSONObject properties = new JSONObject();
                int total = array.size();
                for (int i = 0; i < total; i++) {
                    JSONObject attribute = array.getJSONObject(i);
                    properties.set(attribute.getStr("property"),attribute.getStr("value"));
                }
                device.setId(id);
                device.setProperties(properties);
                return device;
            }
        }
        throw new BaseException(BaseErrors.ERR_1007,result);
    }


    /**
     * 获取告警列表
     * @param req 请求参数
     * @return 解析结果
     */
    public static List<EventMetadata> getEventList(JSONObject param,String req){
        Integer pageIndex = 1;
        Integer pageSize = 20;
        if(JSONUtil.isJsonObj(req)){
            JSONObject reqObject = JSONUtil.parseObj(req);
            pageIndex = reqObject.getInt("pageIndex");
            pageSize = reqObject.getInt("pageSize");
        }

        // 根据eventType,pageIndex,pageSize条件,通过param提供的南向平台信息,获取南向平台的最新告警列表数据
        JSONObject queryParamEntity=JSONUtil.createObj()
                .set("pageIndex",pageIndex)
                .set("pageSize",pageSize)
                .set("orderBy","alarmTime desc ");
        String result = post(param.getStr("secureKey"),param.getStr("clientId"),param.getStr("apiHost")+"/device/alarm/history/_query",queryParamEntity,"application/json");
        if (JSONUtil.isJsonObj(result)) {
            JSONObject jsonObject = JSONUtil.parseObj(result);
            Integer code = jsonObject.getInt("status");
            if (code == 200) {
                JSONObject resultData = jsonObject.getJSONObject("result");
                Integer total = resultData.getInt("total");
                JSONArray array = resultData.getJSONArray("data");
                if(total != array.size()) total = array.size();

                List<EventMetadata> list = new ArrayList<>();

                for (int i = 0; i < total; i++) {
                    JSONObject event = array.getJSONObject(i);

                    EventMetadata eventEntity = new EventMetadata();
                    eventEntity.setId(ProtocolUtil.setPrefix(param,event.getStr("id")));
                    eventEntity.setName(event.getStr("alarmName"));
                    eventEntity.setDeviceId(ProtocolUtil.setPrefix(param,event.getStr("deviceId")));
                    eventEntity.setType(decoderEventType(eventEntity.getName()));
                    eventEntity.setDetails(event.getStr("alarmData"));
                    eventEntity.setCreateTime(event.getDate("alarmTime"));
                    eventEntity.setState(EventStatusType.PENDING.getCode());
                    list.add(eventEntity);
                }
                return list;
            }
        }
        throw new BaseException(BaseErrors.ERR_1007,result);
    }
}

注册器

public class JetlinksProtocolRegister implements ProtocolRegister {

    /**
     * MQTT 配置信息
     */
    private static final CodecMetadata mqttConfig = new CodecMetadata(DefaultTransport.MQTT.getCode(),"MQTT配置", "无需配置认证参数")
            .addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false);

    /**
     * HTTP 配置信息
     */
    private static final CodecMetadata httpConfig = new CodecMetadata(DefaultTransport.HTTP.getCode(),"HTTP配置", "根据jetlinks提供的认证信息,进行http访问调用")
            .addProperty("clientId","授权clientId","Jetlinks物联平台clientId")
            .addProperty("secureKey","授权secureKey","Jetlinks物联平台secureKey")
            .addProperty("apiHost","接口服务地址","Jetlinks物联平台接口服务地址")
            .addProperty("platHost","平台地址","Jetlinks物联平台地址")
            .addProperty(SubsystemConstants.CODE_PREFIX, SubsystemConstants.CODE_PREFIX, "唯一标识前缀",false)
            .addFuncProperty(SubsystemFunc.GET_DEVICE_LIST.getCode(),SubsystemFunc.GET_DEVICE_LIST.getDesc(),"获取设备列表","无参数")
            .addFuncProperty(SubsystemFunc.GET_DEVICE_INFO.getCode(),SubsystemFunc.GET_DEVICE_INFO.getDesc(),"获取设备信息","参数deviceId不能为空")
            .addFuncProperty(SubsystemFunc.GET_EVENT_LIST.getCode(),SubsystemFunc.GET_EVENT_LIST.getDesc(),"获取告警记录","参数pageIndex默认1、pageSize默认20,非必填")
            .addFuncProperty(SubsystemFunc.PLATFORM_URL.getCode(),SubsystemFunc.PLATFORM_URL.getDesc(),"平台地址","无参数");

    @Override
    public ProtocolContainer register() {

        ProtocolContainer protocolRegister = new ProtocolContainer("JetlinksProtocolRegister","Jetlinks协议","Jetlinks协议支持MQTT、HTTP");

        // MQTT 注册
        {
            protocolRegister.addCodecMetadata(mqttConfig)
                    .addProtocolCodec(new JetlinksMqttProtocolCodec());
        }

        // HTTP 注册
        {
            protocolRegister.addCodecMetadata(httpConfig)
                    .addProtocolCodec(new JetlinksHttpProtocolCodec());
        }

        return protocolRegister;
    }

}
文档更新时间: 2023-09-11 14:26   作者:管理员