功能介绍
架构:
表数据:Logstash > Kafka > jar > Elasticsearch > Kibana
a.业务端(或者数据维护人员)按照要求来配置 Logstash 参数,对需要进行索引表数据进行配置,运行Logstash 服务,数据会push到kafka中
b.通过jar服务来接收kafka数据,进行校验、清洗后,同步到Elasticsearch中;
c.通过kibana来查询Elasticseach的数据,按照相应需要的结果提供查找规范api给业务端使用。
文件:OSS > Kafka > jar > Elasticsearch > Kibana
a.业务端(或者数据维护人员)按照要求来配置 Logstash 参数,对需要进行索引表oss文件地址数据进行配置,运行Logstash 服务,数据会push到kafka中
b.通过jar服务来增量获取OSS文件,读取文件内容,按照数据规范同步到Elasticsearch中
c.通过kibana来查询Elasticseach的数据,按照相应需要的结果提供查找规范api给业务端使用。
数据和接口规范
数据字典
定义:
{
"id":"雪花算法生成ID",
"title":"标题内容",
"content":"详情内容",
"buzType":"业务类型(业务可自定义)",
"url":"业务详情页面链接(业务可自定义)",
"userIds":"用户ID(多个逗号分隔)",
"open":"是否公开信息(true为公开userIds为空;其余为false)",
"comIds":"企业ID(多个逗号分隔)",
"tableName":"表名字",
"tableId":"表ID",
"fileType":"文件类型(当是文件填写)",
"publicTime":"发布时间(不存在时,取createTime)",
"createTime":"创建时间(不能为空)",
"updateTime":"更新时间(不能为空)",
"iconUrl":"头像链接",
"peopleName":"头像人名",
"fromTypes":"搜索类型:APP/WEB",
"fromTypeList":["APP","WEB"],
"userIdList":["userID1","userID2"],
"comIdList":["comId1","comId2"],
"fileAttachId":"用来存储内部oss的文件组件地址",
"fileCode":"用来存储内部oss的文件地址",
"show":"业务逻辑判断,需要被搜索到的写true,不需要或需要删除的写false",
"staffIds": "员工id,多个逗号分隔",
"staffIdList": ["staffId1","staffId2"],
"staffPostIds": "员工岗位id,多个逗号分隔",
"staffPostIdList": ["staffPostId1","staffPostId2"],
"actionCode": "备用字段,业务详情跳转唯一标识"
}
通讯录例子:
{
"id": "1296358706270834688",
"title": "黄永露",
"content": "黄永露 13711902104 test@test.com",
"buzType": "通讯录",
"url": "cons_staff&1285124693107544064",
"userIds": "",
"open": true,
"comIds": "1284004574981656576",
"tableName": "cons_staff",
"tableId": "1285124693107544064",
"fileType": "",
"publicTime": "2020-08-19T13:42:20.000+0000",
"createTime": "2020-08-19T13:42:20.000+0000",
"updateTime": "2020-08-21T09:10:04.000+0000",
"iconUrl": null,
"peopleName": null,
"userIdList": null,
"fromTypeList": [
"WEB",
"APP"
]
"comIdList": [
"1284004574981656576"
]
}
查询规范
查询业务列表
地址:http://pai.flyrise.cn/dp-search-api/search/searchBuzTypeList
参数:
{
"buzType": "",
"comId": "1290527607439364096",
"fromType": "",
"fileType": "",
"page": 0,
"pageSize": 0,
"queryStr": "二",
"queryType": 1,
"userId": "",
"staffId": ""
}
参数说明:
{
"buzType": "业务类型(不用填)",
"comId": "企业ID,多个逗号分隔(不可为空)",
"fromType": "查询来源:WEB或APP",
"fileType": "有文件时候,可以制定类型",
"page": "显示页数,从0开始(为0)",
"pageSize": "每页显示数量(为0)",
"queryStr": "查询内容(不可为空)",
"queryType": "查询类型:1为搜索title/content,2仅搜索title,3为content(不可为空)",
"userId": "用户ID,多个逗号分隔",
"staffId": "员工id,多个逗号分隔",
"staffPostIds": "员工岗位id,多个逗号分隔"
}
结果:
{
"status": 0,
"msg": "SUCCESS",
"flyrise": [
{
"buzType": "测试集",
"value": 1
},
{
"buzType": "通讯录",
"value": 1
}
]
}
`
列表查询规范
地址:http://pai.flyrise.cn/dp-search-api/search/searchList
参数:
{
"buzType": "",
"comId": "1284004574981656576",
"fromType": "",
"fileType": "",
"page": 0,
"pageSize": 10,
"queryStr": "黄永露",
"queryType": 1,
"userId": ""
}
参数说明:
{
"buzType": "业务类型",
"comId": "企业ID,多个逗号分隔(不可为空)",
"fromType": "查询来源:WEB或APP",
"fileType": "有文件时候,可以制定类型",
"page": "显示页数,从0开始(不可为空)",
"pageSize": "每页显示数量(不可为空)",
"queryStr": "查询内容(不可为空)",
"queryType": "查询类型:1为搜索title/conten,2仅搜索title,3为content(不可为空)",
"userId": "用户ID,多个逗号分隔",
"staffId": "员工id,多个逗号分隔",
"staffPostIds": "员工岗位id,多个逗号分隔"
}
结果:
{
"status": 0,
"msg": "SUCCESS",
"flyrise": {
"total": 1,
"row": [
{
"id": "1296358706270834688",
"title": "黄永露",
"content": "黄永露 13711902104 test@test.com",
"buzType": "通讯录",
"url": "cons_staff&1285124693107544064",
"userIds": "",
"open": true,
"comIds": "1284004574981656576",
"tableName": "cons_staff",
"tableId": "1285124693107544064",
"fileType": "",
"publicTime": "2020-08-19T13:42:20.000+0000",
"createTime": "2020-08-19T13:42:20.000+0000",
"updateTime": "2020-08-21T09:10:04.000+0000",
"iconUrl": null,
"peopleName": null,
"userIdList": null,
"comIdList": [
"1284004574981656576"
]
}
]
}
}
`
详情查询规范
地址:http://pai.flyrise.cn/dp-search-api/search/getDetail
参数:
id=1296358706270834688
参数说明:
id为列表接口获取的id
结果:
{
"status": 0,
"msg": "SUCCESS",
"flyrise": {
"id": "1296358706270834688",
"title": "黄永露",
"content": "黄永露 13711902104 test@test.com",
"buzType": "通讯录",
"url": "cons_staff&1285124693107544064",
"userIds": "",
"open": true,
"comIds": "1284004574981656576",
"tableName": "cons_staff",
"tableId": "1285124693107544064",
"fileType": "",
"publicTime": "2020-08-19T13:42:20.000+0000",
"createTime": "2020-08-19T13:42:20.000+0000",
"updateTime": "2020-08-21T09:10:04.000+0000",
"iconUrl": null,
"peopleName": null,
"userIdList": null,
"comIdList": [
"1284004574981656576"
]
}
}
删除索引规范
地址:http://pai.flyrise.cn/dp-search-api/search/delete
参数:
tableId=1285124693107544064
tableName=cons_staff
buzType=通讯录
参数:
tableId=业务表内容ID
tableName=业务表名
buzType=业务类型
结果:
{
"flyrise": true,
"msg": "",
"status": 0
}
创建索引(预留)
地址:http://pai.flyrise.cn/dp-search-api/search/createIndex
参数:
{
"buzType": "测试集",
"comIdList": [],
"comIds": "1111,222",
"content": "测试内容",
"createTime": "1598028191000",
"fileType": "",
"fromTypeList": [],
"fromTypes": "WEB,APP",
"iconUrl": "",
"id": "121",
"open": true,
"peopleName": "",
"publicTime": "1598028191000",
"tableId": "1111111",
"tableName": "test",
"title": "测试标题",
"updateTime": "1598028191000",
"url": "",
"userIdList": [],
"userIds": "123,456"
}
参数说明:
{
"buzType": "业务类型(业务自定义)",
"comIdList": ["comID1","comID2"],
"comIds": "企业id,多个逗号分隔",
"content": "内容",
"createTime": "创建时间(不能为空)",
"fileType": "文件类型(当是文件填写)",
"fromTypeList": ["APP","WEB"],
"fromTypes": "搜索类型:APP/WEB",
"iconUrl": "头像链接",
"id": "雪花算法生成ID",
"open": "是否公开信息(true为公开userIds为空;其余为false)",
"peopleName": "头像人名",
"publicTime": "发布时间(不存在时,取createTime)",
"tableId": "表ID",
"tableName": "表ID",
"title": "标题内容",
"updateTime": "更新时间(不能为空)",
"url": "业务详情页面链接(业务可自定义",
"userIdList": ["userID1","userID2"],
"userIds": "用户ID(多个逗号分隔)",
"fileAttachId":"用来存储内部oss的文件组件地址",
"fileCode":"用来存储内部oss的文件地址",
"show":"业务逻辑判断,需要被搜索到的写true,不需要或需要删除的写false"
}
结果:
{
"status": 0,
"msg": "SUCCESS",
"flyrise": "1284004574981656576"
}
}
`
PS:kibana DEV手动创建索引
PUT /company_msg
{
"mappings" : {
"properties" : {
"buzType" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"comIdList" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"comIds" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"content" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"createTime" : {
"type" : "long"
},
"fileType" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"fromTypeList" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"fromTypes" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"iconUrl" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"id" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"open" : {
"type" : "boolean"
},
"publicTime" : {
"type" : "long"
},
"tableId" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"tableName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"title" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"updateTime" : {
"type" : "long"
},
"url" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"userIdList" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"userIds" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
使用案例(通讯录同步和搜索)
数据源同步配置(使用logstash7.17.2)
docker安装logstash7.17.2安装
具体配置如下:
127.0.0.1 仅用来参考,具体以实际分配地址为准。
创建目录
mkdir -p /data/logstash_dp/config/
cd /data/logstash_dp/config/
vi logstash.conf
同步到es
input {
stdin{
}
jdbc {
# 连接的数据库地址和数据库,指定编码格式,禁用SSL协议,设定自动重连
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3308/pai-console?serverTimezone=UTC&characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
# 用户名密码
jdbc_user => "pai-console"
jdbc_password => "xxxx"
#以下配置是为了达到增量同步的目的,每次同步结束之后会记录最后一条数据的tracking_column列,这里设置的是id,就会将这个值记录在last_run_metadata_path中。下次在执行同步的时候会将这个值,赋给sql_last_value
#使用前验证连接是否有效
jdbc_validate_connection => true
#多久进行连接有效验证(4小时)
jdbc_validation_timeout => 14400
#连接失败后最大重试次数
connection_retry_attempts => 50
#连接失败后重试时间间隔
connection_retry_attempts_wait_time => 1
# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
record_last_run => "true"
# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
use_column_value => "true"
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => "updateTime"
tracking_column_type => "timestamp"
# record_last_run上次数据存放位置;
last_run_metadata_path => "/usr/share/logstash/config/cons_staff_last_time"
# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
clean_run => "false"
# type => "wkcar"
# 是否将 字段(column) 名称转小写
lowercase_column_names => false
# jar包的位置
jdbc_driver_library => "/usr/share/logstash/config/mysql-connector-java-8.0.20.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
#jdbc_default_timezone => "Asia/Shanghai"
#jdbc_paging_enabled => "true"
#jdbc_page_size => "100"
#注意这个sql不能出现type,这是es的保留字段
statement => "select a.staff_id as tableId,a.staff_name as title,a.create_time as publicTime, a.create_time as createTime,a.update_time as updateTime,a.ent_id as comIds,a.staff_head as iconUrl,a.staff_status as `status` from cons_staff a where update_time > :sql_last_value"
# 同步频率(分 时 天 月 年),默认每分钟同步一次;当前配置每1分钟执行一下
schedule => "*/1 * * * *"
}
}
filter {
if [status] == '1' {
mutate {
replace => { "show" => "true" }
}
} else {
mutate {
replace => { "show" => "false" }
}
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
mutate {
replace => { "id" => "%{tableId}" }
}
mutate {
replace => { "buzType" => "通讯录" }
}
mutate {
replace => { "url" => "cons_staff&%{tableId}" }
}
mutate {
replace => { "userIds" => "" }
}
mutate {
replace => { "open" => "true" }
}
mutate {
replace => { "tableName" => "cons_staff" }
}
mutate {
replace => { "fileType" => "" }
}
mutate {
replace => { "fromTypes" => "WEB,APP" }
}
}
output {
kafka {
bootstrap_servers => "8.129.13.162:9092" #生产者
codec => json
topic_id => "dp-search-dev" #设置写入kafka的topic
}
stdout {
codec => json_lines
}
}
启动如下:
docker run -d \
--name logstash770-dp \
-p 6002:6000 \
-e TZ=Asia/Shanghai \
-e xpack.monitoring.elasticsearch.hosts=http://127.0.0.1:9200 \
-e xpack.monitoring.enabled=true \
-v /data/logstash_dp/config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
docker.elastic.co/logstash/logstash:7.7.0
下载mysql驱动,copy到docker
docker cp /data/logstash_dp/config/mysql-connector-java-8.0.20.jar logstash770-dp:/usr/share/logstash/config/
下载地址:
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/
其他说明:
查看日志
docker logs -f --tail 10 logstash770-dp
进入logstash770
docker exec -it logstash770-dp /bin/bash
目前是通过判断updateTime的时间戳来同步数据库表信息的,更新数据时,务必保证updateTime不为空
默认排序是按照publicTime的时间戳来逆序排的,更新数据时,也务必保证publicTime的时间不为空(没有publicTime,取createTime)
kafaka获取数据,同步到ES
127.0.0.1 仅用来参考,具体以实际分配地址为准。
kafaka资源地址:
kafka:
servers: 127.0.0.1:9092
topic: dp-search-dev
groupid: search-dev-group-id
ES资源地址:
es:
host: 127.0.0.1
port: 9200
同步ES对象说明:
数据对象:
数据字典
定义:
{
"id":"雪花算法生成ID",
"title":"标题内容",
"content":"详情内容",
"buzType":"业务类型(业务可自定义)",
"url":"业务详情页面链接(业务可自定义)",
"userIds":"用户ID(多个逗号分隔)",
"open":"是否公开信息(true为公开userIds为空;其余为false)",
"comIds":"企业ID(多个逗号分隔)",
"tableName":"表名字",
"tableId":"表ID",
"fileType":"文件类型(当是文件填写)",
"publicTime":"发布时间(不存在时,取createTime)",
"createTime":"创建时间(不能为空)",
"updateTime":"更新时间(不能为空)",
"iconUrl":"头像链接",
"peopleName":"头像人名",
"fromTypes":"搜索类型:APP/WEB",
"fromTypeList":["APP","WEB"],
"userIdList":["userID1","userID2"],
"comIdList":["comId1","comId2"],
"fileAttachId":"用来存储内部oss的文件组件地址",
"fileCode":"用来存储内部oss的文件地址",
"show":"业务逻辑判断,需要被搜索到的写true,不需要或需要删除的写false",
"staffIds": "员工id,多个逗号分隔",
"staffIdList": ["staffId1","staffId2"],
"staffPostIds": "员工岗位id,多个逗号分隔",
"staffPostIdList": ["staffPostId1","staffPostId2"],
"actionCode": "备用字段,业务详情跳转唯一标识"
}
通讯录例子:
{
"id": "1296358706270834688",
"title": "黄永露",
"content": "黄永露 13711902104 test@test.com",
"buzType": "通讯录",
"url": "cons_staff&1285124693107544064",
"userIds": "",
"open": true,
"comIds": "1284004574981656576",
"tableName": "cons_staff",
"tableId": "1285124693107544064",
"fileType": "",
"publicTime": "2020-08-19T13:42:20.000+0000",
"createTime": "2020-08-19T13:42:20.000+0000",
"updateTime": "2020-08-21T09:10:04.000+0000",
"iconUrl": null,
"peopleName": null,
"userIdList": null,
"fromTypeList": [
"WEB",
"APP"
]
"comIdList": [
"1284004574981656576"
]
}
搜索说明:
目前支持多个comIds,多个userIds搜索,按publicTime倒序/tableId顺序