在上一篇文章中,咱们介绍了Elasticsearch java client的一些基本用法,为了达到生产级别的运用规范,下面介绍一些进阶的用法。

1.客户端tcp衔接超时

在咱们创立客户端时,实践上创立的是RestClient,而底层运用的是apacheHttpClient,在创立后长期无操作时这个衔接或许会被封闭,此时客户端并不知晓,直接运用就会提示下文中的过错。再次恳求又是正常的,因为客户端会重新创立衔接。

java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-6 [ACTIVE]
  at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:915)
  at org.elasticsearch.client.RestClient.performRequest(RestClient.java:300)
  at org.elasticsearch.client.RestClient.performRequest(RestClient.java:288)
  at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147)
  at co.elastic.clients.elasticsearch.ElasticsearchClient.search(ElasticsearchClient.java:1833)
Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-6 [ACTIVE]
  at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:381)
  at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
  at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
  at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
  at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
  at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
  at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
  at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
  at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
  at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
  at java.lang.Thread.run(Thread.java:748)

这个问题在Github的官方Issues上面也有相关的讨论:github.com/elastic/ela…

解决问题的关键在于让衔接可以坚持tcp keepalive,有两种计划:

计划一:在客户端中显式的敞开keepalive选项

RestClient httpClient = RestClient.builder(new HttpHost(hostName, port))
    .setHttpClientConfigCallback(hc -> hc
        .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())
     ).build();

别的,还需求设置体系层面的tcp keepalive探测时间,默认值7200s太长或许会被自动封闭,建议修改为300s,默认配置如下:

net.ipv4.tcp_keepalive_time = 7200
net.ipv4.tcp_keepalive_intvl = 75
net.ipv4.tcp_keepalive_probes = 9

计划二:在客户端设置keepalive策略,在超越指定时间后由客户端自行封闭,运用时再重新创立

RestClient httpClient = RestClient.builder(new HttpHost(hostName, port))
    .setHttpClientConfigCallback(hc -> hc
            .setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis()))
    .build();

2.聚合核算

在Elasticsearch中Aggregation 分为3种类型:

  • Metric: 核算类型,对字段进行核算平均值、求和等;
  • Bucket: 分组核算,根据字段或许规模将文档分组到桶中进行核算;
  • Pipeline:对聚合成果再次进行聚合核算;

在分组核算中有2个参数需求特别关注:Size、Shard size。官方文档:www.elastic.co/guide/en/el…

Size:在运用terms对字段进行分桶时,默认值回来top 10文档,即只有10个核算成果,经过设置size的巨细可以回来所需巨细,最大值不超越search.max_buckets

MultiTermsAggregation aggregation = MultiTermsAggregation.of(s -> s.terms(
    MultiTermLookup.of(t->t.field("product")),
    MultiTermLookup.of(t->t.field("user"))
).size(100));

Shard size:在上一篇文章中,咱们介绍过Elasticsearch的查询进程,需求从每个分片获取成果后,再由和谐节点进行兼并排序。因为数据分布不均匀的原因,如果每个分片只获取size巨细的文档,或许会出现核算误差。

Elasticsearch的解决计划是获取比所需更多的文档,在必定程度上避免这个问题,也就是Shard size参数的用处。默认值:Shard size = size * 1.5 + 10,在数据偏斜严重的情况下,可以恰当调大这个参数,当然也意味着更多的功能损耗。

3.数据快照

之前咱们介绍过search_after 可以完成深度分页功用,而在一些大批量数据导出的场景下,一般需求坚持数据游标不变来导出完整的数据,类似于快照的功用。而这就需求用到 point in time (PIT)

//获取pit id
OpenPointInTimeRequest openRequest = OpenPointInTimeRequest.of(o -> o
    .index(getIndex())
    .keepAlive(Time.of(t->t.time("10m"))));
OpenPointInTimeResponse openResponse = elasticsearchClient.openPointInTime(openRequest);
//查询数据
SearchRequest searchRequest = new SearchRequest.Builder()
    .size(pageSize)
    .sort(sortOptions)
    .pit(p -> p.id(params.getPit()));
    .build();
elasticsearchClient.search(searchRequest);    
//封闭pit
ClosePointInTimeRequest closeRequest = ClosePointInTimeRequest.of(c -> c.id(pit));
elasticsearchClient.closePointInTime(closeRequest);

4.获取查找成果数量

在默认情况下search接口回来的hits size最大值是10000,如果需求获取实践的成果总数,需求敞开TrackHits

SearchRequest searchRequest = new SearchRequest.Builder()
    .trackTotalHits(TrackHits.of(t->t.enabled(true)));
    .build();
elasticsearchClient.search(searchRequest);    

5.并发写入

一般情况下,Elasticsearch数据的写入会经过mq来进行触发,理论上可以经过mq的有序性来操控并发写入导致的数据覆盖问题,现实情况中考虑到功能、可靠性,较少选用这种方式。

计划一:添加version数据版别字段,经过CAS操作来完成乐观锁

计划二:运用分布式锁,确保同一时间单个文档只有一个线程在履行更新操作,重试操作可以由mq来完成;

6.数据库业务

假定你正在运用Elasticsearch存储订单数据,在业务代码中的履行步骤如下:

  • 更新MySQL中订单表数据;
  • 发送订单变更的mq通知;
  • 消费mq消息,从MySQL读取最新的数据写入Elasticsearch;

在运转一段时间后,你或许会发现Elasticsearch的数据与MySQL不一致,不是最新的版别;仔细分析上述进程会发现一个问题,在履行第2步操作时第1步的数据业务还没提交完成,将导致第3读取的不是最新数据。提供一种解决问题的思路,在业务提交完成后再发送mq消息。

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter(){
    @Override
    public void afterCommit() {
        //发送mq
    }
});

今日就先写到这里,你学”废”了吗。

Elasticsearch Client 进阶使用