共计 9393 个字符,预计需要花费 24 分钟才能阅读完成。
一、简介
ElasticSearch 和 Solr 都是基于 Lucene 的搜索引擎,不过 ElasticSearch 天生支持分布式,而 Solr 是 4.0 版本后的 SolrCloud 才是分布式版本,Solr 的分布式支持需要 ZooKeeper 的支持。
这里有一个详细的 ElasticSearch 和 Solr 的对比:http://solr-vs-elasticsearch.com/
二、基本用法
Elasticsearch 集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含多个文档(documents),然后每个文档包含多个字段(Fields),这种面向文档型的储存,也算是 NoSQL 的一种吧。
ES 比传统关系型数据库,对一些概念上的理解:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
从创建一个 Client 到添加、删除、查询等基本用法:
1、创建 Client
public ElasticSearchService(String ipAddress, int port) {
client = new TransportClient()
.addTransportAddress(new InetSocketTransportAddress(ipAddress,
port));
}
这里是一个 TransportClient。
ES 下两种客户端对比:
TransportClient:轻量级的 Client,使用 Netty 线程池,Socket 连接到 ES 集群。本身不加入到集群,只作为请求的处理。
Node Client:客户端节点本身也是 ES 节点,加入到集群,和其他 ElasticSearch 节点一样。频繁的开启和关闭这类 Node Clients 会在集群中产生“噪音”。
2、创建 / 删除 Index 和 Type 信息
// 创建索引
public void createIndex() {
client.admin().indices().create(new CreateIndexRequest(IndexName))
.actionGet();
}
// 清除所有索引
public void deleteIndex() {
IndicesExistsResponse indicesExistsResponse = client.admin().indices()
.exists(new IndicesExistsRequest(new String[] {IndexName}))
.actionGet();
if (indicesExistsResponse.isExists()) {
client.admin().indices().delete(new DeleteIndexRequest(IndexName))
.actionGet();
}
}
// 删除 Index 下的某个 Type
public void deleteType(){
client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();
}
// 定义索引的映射类型
public void defineIndexTypeMapping() {
try {
XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
mapBuilder.startObject()
.startObject(TypeName)
.startObject(“properties”)
.startObject(IDFieldName).field(“type”, “long”).field(“store”, “yes”).endObject()
.startObject(SeqNumFieldName).field(“type”, “long”).field(“store”, “yes”).endObject()
.startObject(IMSIFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
.startObject(IMEIFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
.startObject(DeviceIDFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
.startObject(OwnAreaFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
.startObject(TeleOperFieldName).field(“type”, “string”).field(“index”, “not_analyzed”).field(“store”, “yes”).endObject()
.startObject(TimeFieldName).field(“type”, “date”).field(“store”, “yes”).endObject()
.endObject()
.endObject()
.endObject();
PutMappingRequest putMappingRequest = Requests
.putMappingRequest(IndexName).type(TypeName)
.source(mapBuilder);
client.admin().indices().putMapping(putMappingRequest).actionGet();
} catch (IOException e) {
log.error(e.toString());
}
}
这里自定义了某个 Type 的索引映射(Mapping),默认 ES 会自动处理数据类型的映射:针对整型映射为 long,浮点数为 double,字符串映射为 string,时间为 date,true 或 false 为 boolean。
注意:针对字符串,ES 默认会做“analyzed”处理,即先做分词、去掉 stop words 等处理再 index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field(“index”, “not_analyzed”)。
详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html
3、索引数据
// 批量索引数据
public void indexHotSpotDataList(List<Hotspotdata> dataList) {
if (dataList != null) {
int size = dataList.size();
if (size > 0) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < size; ++i) {
Hotspotdata data = dataList.get(i);
String jsonSource = getIndexDataFromHotspotData(data);
if (jsonSource != null) {
bulkRequest.add(client
.prepareIndex(IndexName, TypeName,
data.getId().toString())
.setRefresh(true).setSource(jsonSource));
}
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
Iterator<BulkItemResponse> iter = bulkResponse.iterator();
while (iter.hasNext()) {
BulkItemResponse itemResponse = iter.next();
if (itemResponse.isFailed()) {
log.error(itemResponse.getFailureMessage());
}
}
}
}
}
}
// 索引数据
public boolean indexHotspotData(Hotspotdata data) {
String jsonSource = getIndexDataFromHotspotData(data);
if (jsonSource != null) {
IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
TypeName).setRefresh(true);
requestBuilder.setSource(jsonSource)
.execute().actionGet();
return true;
}
return false;
}
// 得到索引字符串
public String getIndexDataFromHotspotData(Hotspotdata data) {
String jsonString = null;
if (data != null) {
try {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jsonBuilder.startObject().field(IDFieldName, data.getId())
.field(SeqNumFieldName, data.getSeqNum())
.field(IMSIFieldName, data.getImsi())
.field(IMEIFieldName, data.getImei())
.field(DeviceIDFieldName, data.getDeviceID())
.field(OwnAreaFieldName, data.getOwnArea())
.field(TeleOperFieldName, data.getTeleOper())
.field(TimeFieldName, data.getCollectTime())
.endObject();
jsonString = jsonBuilder.string();
} catch (IOException e) {
log.equals(e);
}
}
return jsonString;
}
ES 支持批量和单个数据索引。
4、查询获取数据
// 获取少量数据 100 个
private List<Integer> getSearchData(QueryBuilder queryBuilder) {
List<Integer> ids = new ArrayList<>();
SearchResponse searchResponse = client.prepareSearch(IndexName)
.setTypes(TypeName).setQuery(queryBuilder).setSize(100)
.execute().actionGet();
SearchHits searchHits = searchResponse.getHits();
for (SearchHit searchHit : searchHits) {
Integer id = (Integer) searchHit.getSource().get(“id”);
ids.add(id);
}
return ids;
}
// 获取大量数据
private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
List<Integer> ids = new ArrayList<>();
// 一次获取 100000 数据
SearchResponse scrollResp = client.prepareSearch(IndexName)
.setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
.setQuery(queryBuilder).setSize(100000).execute().actionGet();
while (true) {
for (SearchHit searchHit : scrollResp.getHits().getHits()) {
Integer id = (Integer) searchHit.getSource().get(IDFieldName);
ids.add(id);
}
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000)).execute().actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
return ids;
}
这里的 QueryBuilder 是一个查询条件,ES 支持分页查询获取数据,也可以一次性获取大量数据,需要使用 Scroll Search。
5、聚合(Aggregation Facet)查询
// 得到某段时间内设备列表上每个设备的数据分布情况 < 设备 ID,数量 >
public Map<String, String> getDeviceDistributedInfo(String startTime,
String endTime, List<String> deviceList) {
Map<String, String> resultsMap = new HashMap<>();
QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(deviceQueryBuilder).must(rangeBuilder);
TermsBuilder termsBuilder = AggregationBuilders.terms(“DeviceIDAgg”).size(Integer.MAX_VALUE)
.field(DeviceIDFieldName);
SearchResponse searchResponse = client.prepareSearch(IndexName)
.setQuery(queryBuilder).addAggregation(termsBuilder)
.execute().actionGet();
Terms terms = searchResponse.getAggregations().get(“DeviceIDAgg”);
if (terms != null) {
for (Terms.Bucket entry : terms.getBuckets()) {
resultsMap.put(entry.getKey(),
String.valueOf(entry.getDocCount()));
}
}
return resultsMap;
}
Aggregation 查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。
详情参考:https://www.elastic.co/guide/en/elasticsearch/client/Java-api/current/java-aggs.html
三、集群配置
配置文件 elasticsearch.yml
集群名和节点名:
#cluster.name: elasticsearch
#node.name: “Franz Kafka”
是否参与 master 选举和是否存储数据
#node.master: true
#node.data: true
分片数和副本数
#index.number_of_shards: 5
#index.number_of_replicas: 1
master 选举最少的节点数,这个一定要设置为整个集群节点个数的一半加 1,即 N /2+1
#discovery.zen.minimum_master_nodes: 1
discovery ping 的超时时间,拥塞网络,网络状态不佳的情况下设置高一点
#discovery.zen.ping.timeout: 3s
注意,分布式系统整个集群节点个数 N 要为奇数个!!
四、Elasticsearch 插件
1、elasticsearch-head 是一个 elasticsearch 的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head
2、elasticsearch-sql:使用 SQL 语法查询 elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip –install sql
github 地址:https://github.com/NLPchina/elasticsearch-sql
3、elasticsearch-bigdesk 是 elasticsearch 的一个集群监控工具,可以通过它来查看 ES 集群的各种状态。
安装:./bin/plugin -install lukas-vlcek/bigdesk
访问:http://192.103.101.203:9200/_plugin/bigdesk/,
4、elasticsearch-servicewrapper 插件是 ElasticSearch 的服务化插件,
在 https://github.com/elasticsearch/elasticsearch-servicewrapper 下载该插件后,解压缩,将 service 目录拷贝到 elasticsearch 目录的 bin 目录下。
而后,可以通过执行以下语句安装、启动、停止 ElasticSearch:
sh elasticsearch install
sh elasticsearch start
sh elasticsearch stop
Linux 上安装部署 ElasticSearch 全程记录 http://www.linuxidc.com/Linux/2015-09/123241.htm
Elasticsearch 安装使用教程 http://www.linuxidc.com/Linux/2015-02/113615.htm
ElasticSearch 配置文件译文解析 http://www.linuxidc.com/Linux/2015-02/114244.htm
ElasticSearch 集群搭建实例 http://www.linuxidc.com/Linux/2015-02/114243.htm
分布式搜索 ElasticSearch 单机与服务器环境搭建 http://www.linuxidc.com/Linux/2012-05/60787.htm
ElasticSearch 的工作机制 http://www.linuxidc.com/Linux/2014-11/109922.htm
ElasticSearch 的详细介绍 :请点这里
ElasticSearch 的下载地址 :请点这里
本文永久更新链接地址 :http://www.linuxidc.com/Linux/2015-10/124046.htm