功能介绍

架构:

表数据:Logstash > Kafka > jar > Elasticsearch > Kibana
a.业务端(或者数据维护人员)按照要求来配置 Logstash 参数,对需要进行索引表数据进行配置,运行Logstash 服务,数据会push到kafka中
b.通过jar服务来接收kafka数据,进行校验、清洗后,同步到Elasticsearch中;
c.通过kibana来查询Elasticseach的数据,按照相应需要的结果提供查找规范api给业务端使用。

文件:OSS > Kafka > jar > Elasticsearch > Kibana
a.业务端(或者数据维护人员)按照要求来配置 Logstash 参数,对需要进行索引表oss文件地址数据进行配置,运行Logstash 服务,数据会push到kafka中
b.通过jar服务来增量获取OSS文件,读取文件内容,按照数据规范同步到Elasticsearch中
c.通过kibana来查询Elasticseach的数据,按照相应需要的结果提供查找规范api给业务端使用。

数据和接口规范

数据字典

定义:

{
    "id":"雪花算法生成ID",
    "title":"标题内容",
    "content":"详情内容",
    "buzType":"业务类型(业务可自定义)",
    "url":"业务详情页面链接(业务可自定义)",
    "userIds":"用户ID(多个逗号分隔)",
    "open":"是否公开信息(true为公开userIds为空;其余为false)",
    "comIds":"企业ID(多个逗号分隔)",
    "tableName":"表名字",
    "tableId":"表ID",
    "fileType":"文件类型(当是文件填写)",
    "publicTime":"发布时间(不存在时,取createTime)",
    "createTime":"创建时间(不能为空)",
    "updateTime":"更新时间(不能为空)",
    "iconUrl":"头像链接",
    "peopleName":"头像人名",
    "fromTypes":"搜索类型:APP/WEB",
    "fromTypeList":["APP","WEB"],
    "userIdList":["userID1","userID2"],
    "comIdList":["comId1","comId2"],
    "fileAttachId":"用来存储内部oss的文件组件地址",
    "fileCode":"用来存储内部oss的文件地址",
    "show":"业务逻辑判断,需要被搜索到的写true,不需要或需要删除的写false",
    "staffIds": "员工id,多个逗号分隔",
    "staffIdList": ["staffId1","staffId2"],
    "staffPostIds": "员工岗位id,多个逗号分隔",
    "staffPostIdList":  ["staffPostId1","staffPostId2"],
    "actionCode": "备用字段,业务详情跳转唯一标识"
}

通讯录例子:

 {
        "id": "1296358706270834688",
        "title": "黄永露",
        "content": "黄永露 13711902104 test@test.com",
        "buzType": "通讯录",
        "url": "cons_staff&1285124693107544064",
        "userIds": "",
        "open": true,
        "comIds": "1284004574981656576",
        "tableName": "cons_staff",
        "tableId": "1285124693107544064",
        "fileType": "",
        "publicTime": "2020-08-19T13:42:20.000+0000",
        "createTime": "2020-08-19T13:42:20.000+0000",
        "updateTime": "2020-08-21T09:10:04.000+0000",
        "iconUrl": null,
        "peopleName": null,
        "userIdList": null,
         "fromTypeList": [
          "WEB",
          "APP"
        ]
        "comIdList": [
          "1284004574981656576"
        ]
      }

查询规范

查询业务列表

地址:http://pai.flyrise.cn/dp-search-api/search/searchBuzTypeList
参数:

{
    "buzType": "",
    "comId": "1290527607439364096",
    "fromType": "",
    "fileType": "",
    "page": 0,
    "pageSize": 0,
    "queryStr": "二",
    "queryType": 1,
    "userId": "",
    "staffId": ""
}

参数说明:

{
    "buzType": "业务类型(不用填)",
    "comId": "企业ID,多个逗号分隔(不可为空)",
    "fromType": "查询来源:WEB或APP",
    "fileType": "有文件时候,可以制定类型",
    "page": "显示页数,从0开始(为0)",
    "pageSize": "每页显示数量(为0)",
    "queryStr": "查询内容(不可为空)",
    "queryType": "查询类型:1为搜索title/content,2仅搜索title,3为content(不可为空)",
    "userId": "用户ID,多个逗号分隔",
    "staffId": "员工id,多个逗号分隔",
    "staffPostIds": "员工岗位id,多个逗号分隔"
}

结果:

{
  "status": 0,
  "msg": "SUCCESS",
  "flyrise": [
    {
      "buzType": "测试集",
      "value": 1
    },
    {
      "buzType": "通讯录",
      "value": 1
    }
  ]
}
`

列表查询规范

地址:http://pai.flyrise.cn/dp-search-api/search/searchList
参数:

{
    "buzType": "",
    "comId": "1284004574981656576",
    "fromType": "",
    "fileType": "",
    "page": 0,
    "pageSize": 10,
    "queryStr": "黄永露",
    "queryType": 1,
    "userId": ""
}

参数说明:

{
    "buzType": "业务类型",
    "comId": "企业ID,多个逗号分隔(不可为空)",
    "fromType": "查询来源:WEB或APP",
    "fileType": "有文件时候,可以制定类型",
    "page": "显示页数,从0开始(不可为空)",
    "pageSize": "每页显示数量(不可为空)",
    "queryStr": "查询内容(不可为空)",
    "queryType": "查询类型:1为搜索title/conten,2仅搜索title,3为content(不可为空)",
    "userId": "用户ID,多个逗号分隔",
    "staffId": "员工id,多个逗号分隔",
    "staffPostIds": "员工岗位id,多个逗号分隔"
}

结果:

{
  "status": 0,
  "msg": "SUCCESS",
  "flyrise": {
    "total": 1,
    "row": [
      {
        "id": "1296358706270834688",
        "title": "黄永露",
        "content": "黄永露 13711902104 test@test.com",
        "buzType": "通讯录",
        "url": "cons_staff&1285124693107544064",
        "userIds": "",
        "open": true,
        "comIds": "1284004574981656576",
        "tableName": "cons_staff",
        "tableId": "1285124693107544064",
        "fileType": "",
        "publicTime": "2020-08-19T13:42:20.000+0000",
        "createTime": "2020-08-19T13:42:20.000+0000",
        "updateTime": "2020-08-21T09:10:04.000+0000",
        "iconUrl": null,
        "peopleName": null,
        "userIdList": null,
        "comIdList": [
          "1284004574981656576"
        ]
      }
    ]
  }
}
`

详情查询规范

地址:http://pai.flyrise.cn/dp-search-api/search/getDetail
参数:

id=1296358706270834688

参数说明:

id为列表接口获取的id

结果:

{
  "status": 0,
  "msg": "SUCCESS",
  "flyrise": {
    "id": "1296358706270834688",
    "title": "黄永露",
    "content": "黄永露 13711902104 test@test.com",
    "buzType": "通讯录",
    "url": "cons_staff&1285124693107544064",
    "userIds": "",
    "open": true,
    "comIds": "1284004574981656576",
    "tableName": "cons_staff",
    "tableId": "1285124693107544064",
    "fileType": "",
    "publicTime": "2020-08-19T13:42:20.000+0000",
    "createTime": "2020-08-19T13:42:20.000+0000",
    "updateTime": "2020-08-21T09:10:04.000+0000",
    "iconUrl": null,
    "peopleName": null,
    "userIdList": null,
    "comIdList": [
      "1284004574981656576"
    ]
  }
}

删除索引规范

地址:http://pai.flyrise.cn/dp-search-api/search/delete
参数:

tableId=1285124693107544064
tableName=cons_staff
buzType=通讯录

参数:

tableId=业务表内容ID
tableName=业务表名
buzType=业务类型

结果:

{
    "flyrise": true,
    "msg": "",
    "status": 0
}

创建索引(预留)

地址:http://pai.flyrise.cn/dp-search-api/search/createIndex
参数:

{
    "buzType": "测试集",
    "comIdList": [],
    "comIds": "1111,222",
    "content": "测试内容",
    "createTime": "1598028191000",
    "fileType": "",
    "fromTypeList": [],
    "fromTypes": "WEB,APP",
    "iconUrl": "",
    "id": "121",
    "open": true,
    "peopleName": "",
    "publicTime": "1598028191000",
    "tableId": "1111111",
    "tableName": "test",
    "title": "测试标题",
    "updateTime": "1598028191000",
    "url": "",
    "userIdList": [],
    "userIds": "123,456"
}

参数说明:

{
    "buzType": "业务类型(业务自定义)",
    "comIdList": ["comID1","comID2"],
    "comIds": "企业id,多个逗号分隔",
    "content": "内容",
    "createTime": "创建时间(不能为空)",
    "fileType": "文件类型(当是文件填写)",
    "fromTypeList": ["APP","WEB"],
    "fromTypes": "搜索类型:APP/WEB",
    "iconUrl": "头像链接",
    "id": "雪花算法生成ID",
    "open": "是否公开信息(true为公开userIds为空;其余为false)",
    "peopleName": "头像人名",
    "publicTime": "发布时间(不存在时,取createTime)",
    "tableId": "表ID",
    "tableName": "表ID",
    "title": "标题内容",
    "updateTime": "更新时间(不能为空)",
    "url": "业务详情页面链接(业务可自定义",
    "userIdList": ["userID1","userID2"],
    "userIds": "用户ID(多个逗号分隔)",
    "fileAttachId":"用来存储内部oss的文件组件地址",
    "fileCode":"用来存储内部oss的文件地址",
    "show":"业务逻辑判断,需要被搜索到的写true,不需要或需要删除的写false"
}

结果:

{
  "status": 0,
  "msg": "SUCCESS",
  "flyrise": "1284004574981656576"
  }
}
`

PS:kibana DEV手动创建索引

PUT /company_msg

{
    "mappings" : {
      "properties" : {
        "buzType" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "comIdList" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "comIds" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "content" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "createTime" : {
          "type" : "long"
        },
        "fileType" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "fromTypeList" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "fromTypes" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "iconUrl" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "id" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "open" : {
          "type" : "boolean"
        },
        "publicTime" : {
          "type" : "long"
        },
        "tableId" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "tableName" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "title" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "updateTime" : {
          "type" : "long"
        },
        "url" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "userIdList" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "userIds" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
}

使用案例(通讯录同步和搜索)

数据源同步配置(使用logstash7.17.2)

docker安装logstash7.17.2安装

具体配置如下:

127.0.0.1 仅用来参考,具体以实际分配地址为准。

创建目录

mkdir -p /data/logstash_dp/config/
cd /data/logstash_dp/config/

vi logstash.conf

同步到es

input {
    stdin{
    }
    jdbc {
      # 连接的数据库地址和数据库,指定编码格式,禁用SSL协议,设定自动重连
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3308/pai-console?serverTimezone=UTC&characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
      # 用户名密码
      jdbc_user => "pai-console"
      jdbc_password => "xxxx"
      #以下配置是为了达到增量同步的目的,每次同步结束之后会记录最后一条数据的tracking_column列,这里设置的是id,就会将这个值记录在last_run_metadata_path中。下次在执行同步的时候会将这个值,赋给sql_last_value
      #使用前验证连接是否有效
      jdbc_validate_connection => true
      #多久进行连接有效验证(4小时)
      jdbc_validation_timeout => 14400
      #连接失败后最大重试次数
      connection_retry_attempts => 50
      #连接失败后重试时间间隔
     connection_retry_attempts_wait_time => 1
      # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
      record_last_run => "true"
      # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
      use_column_value => "true"
      # 需要记录的字段,用于增量同步,需是数据库字段
      tracking_column => "updateTime"
      tracking_column_type => "timestamp"
      # record_last_run上次数据存放位置;
      last_run_metadata_path => "/usr/share/logstash/config/cons_staff_last_time"
      # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
      clean_run => "false"
      # type => "wkcar"
      # 是否将 字段(column) 名称转小写
      lowercase_column_names => false
      # jar包的位置
      jdbc_driver_library => "/usr/share/logstash/config/mysql-connector-java-8.0.20.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #jdbc_default_timezone => "Asia/Shanghai"
      #jdbc_paging_enabled => "true"
      #jdbc_page_size => "100"
      #注意这个sql不能出现type,这是es的保留字段
      statement => "select a.staff_id as tableId,a.staff_name as title,a.create_time as publicTime, a.create_time as createTime,a.update_time as updateTime,a.ent_id as comIds,a.staff_head as iconUrl,a.staff_status as `status` from cons_staff a  where update_time > :sql_last_value"
      # 同步频率(分 时 天 月 年),默认每分钟同步一次;当前配置每1分钟执行一下 
      schedule => "*/1 * * * *"
    }
}
filter {
        if [status] == '1' {
            mutate {
                   replace => { "show" => "true" }
            }
        } else {
            mutate {
                   replace => { "show" => "false" }
            }
        }
ruby { 
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
        mutate {
            replace => { "id" => "%{tableId}" }
        }
        mutate {
            replace => { "buzType" => "通讯录" }
        }
        mutate {
            replace => { "url" => "cons_staff&%{tableId}" }
        }
        mutate {
            replace => { "userIds" => "" }
        }
        mutate {
            replace => { "open" => "true" }
        }
        mutate {
            replace => { "tableName" => "cons_staff" }
        }
        mutate {
            replace => { "fileType" => "" }
        }
        mutate {
            replace => { "fromTypes" => "WEB,APP" }
        }
}
output {
    kafka {
        bootstrap_servers => "8.129.13.162:9092"    #生产者
        codec => json
        topic_id => "dp-search-dev"    #设置写入kafka的topic
    }
    stdout {
        codec => json_lines
    }
}

启动如下:

docker run -d \
    --name logstash770-dp \
    -p 6002:6000 \
    -e TZ=Asia/Shanghai \
    -e xpack.monitoring.elasticsearch.hosts=http://127.0.0.1:9200 \
    -e xpack.monitoring.enabled=true \
    -v /data/logstash_dp/config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
    docker.elastic.co/logstash/logstash:7.7.0

下载mysql驱动,copy到docker

docker cp /data/logstash_dp/config/mysql-connector-java-8.0.20.jar logstash770-dp:/usr/share/logstash/config/

下载地址:
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/

其他说明:

查看日志

docker logs -f --tail 10 logstash770-dp

进入logstash770

docker exec -it logstash770-dp /bin/bash

目前是通过判断updateTime的时间戳来同步数据库表信息的,更新数据时,务必保证updateTime不为空
默认排序是按照publicTime的时间戳来逆序排的,更新数据时,也务必保证publicTime的时间不为空(没有publicTime,取createTime)

kafaka获取数据,同步到ES

127.0.0.1 仅用来参考,具体以实际分配地址为准。

kafaka资源地址:

kafka:
  servers: 127.0.0.1:9092
  topic: dp-search-dev
  groupid: search-dev-group-id

ES资源地址:

es:
  host: 127.0.0.1
  port: 9200

同步ES对象说明:

数据对象:

数据字典

定义:

{
    "id":"雪花算法生成ID",
    "title":"标题内容",
    "content":"详情内容",
    "buzType":"业务类型(业务可自定义)",
    "url":"业务详情页面链接(业务可自定义)",
    "userIds":"用户ID(多个逗号分隔)",
    "open":"是否公开信息(true为公开userIds为空;其余为false)",
    "comIds":"企业ID(多个逗号分隔)",
    "tableName":"表名字",
    "tableId":"表ID",
    "fileType":"文件类型(当是文件填写)",
    "publicTime":"发布时间(不存在时,取createTime)",
    "createTime":"创建时间(不能为空)",
    "updateTime":"更新时间(不能为空)",
    "iconUrl":"头像链接",
    "peopleName":"头像人名",
    "fromTypes":"搜索类型:APP/WEB",
    "fromTypeList":["APP","WEB"],
    "userIdList":["userID1","userID2"],
    "comIdList":["comId1","comId2"],
    "fileAttachId":"用来存储内部oss的文件组件地址",
    "fileCode":"用来存储内部oss的文件地址",
    "show":"业务逻辑判断,需要被搜索到的写true,不需要或需要删除的写false",
    "staffIds": "员工id,多个逗号分隔",
    "staffIdList": ["staffId1","staffId2"],
    "staffPostIds": "员工岗位id,多个逗号分隔",
    "staffPostIdList":  ["staffPostId1","staffPostId2"],
    "actionCode": "备用字段,业务详情跳转唯一标识"
}

通讯录例子:

 {
        "id": "1296358706270834688",
        "title": "黄永露",
        "content": "黄永露 13711902104 test@test.com",
        "buzType": "通讯录",
        "url": "cons_staff&1285124693107544064",
        "userIds": "",
        "open": true,
        "comIds": "1284004574981656576",
        "tableName": "cons_staff",
        "tableId": "1285124693107544064",
        "fileType": "",
        "publicTime": "2020-08-19T13:42:20.000+0000",
        "createTime": "2020-08-19T13:42:20.000+0000",
        "updateTime": "2020-08-21T09:10:04.000+0000",
        "iconUrl": null,
        "peopleName": null,
        "userIdList": null,
         "fromTypeList": [
          "WEB",
          "APP"
        ]
        "comIdList": [
          "1284004574981656576"
        ]
      }

搜索说明:

目前支持多个comIds,多个userIds搜索,按publicTime倒序/tableId顺序

文档更新时间: 2022-11-29 11:32   作者:伍润源