该篇博客为elasticsearch 索引相关Java Api 事例,包括索引,更新,删除,批量请求-bulkApi,批量处理器-BulkProcessor等操作。
官网传送门
1. 准备索引
博客中事例主要用到两个索引, mapping如下
1.twitter
"properties": {
"age": {
"type": "long"
},
"message": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"postDate": {
"type": "long"
},
"price": {
"type": "double"
},
"title": {
"type": "text"
}
}
2.schools
"properties": {
"address": {
"type": "keyword"
},
"classes_id": {
"type": "long"
},
"doc": {
"properties": {
"name": {
"type": "keyword"
}
}
},
"info": {
"properties": {
"version": {
"type": "keyword"
}
}
},
"name": {
"type": "text"
},
"price": {
"type": "double"
},
"timestamp": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"users": {
"type": "nested",
"properties": {
"age": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"price": {
"type": "float"
}
}
}
}
2. 引入依赖
<properties>
<elasticsearch.version>5.2.2</elasticsearch.version>
</properties>
<!--es的依赖-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
3. 配置
yaml文件
es:
# 注意:es config 文件中cluster-name要把注释打开,不然启动报错NoNodeAvailableException
cluster-name: my-application
ip: 127.0.0.1
port: 9300
config类
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author yangyang7_kzx
* @date 2019/11/26 17:36
* @description
*/
@Slf4j
@Configuration
public class EsConfig {
@Value("${es.cluster-name}")
private String clusterName;
@Value("${es.ip}")
private String ips;
@Value("${es.port}")
private String port;
@Bean("dataClient")
public TransportClient getTransPortClient() {
TransportClient client = null;
Settings settings = Settings.builder()
// 集群名, 如果你需要更改集群名(默认是elasticsearch)
.put("cluster.name", clusterName)
// 设置client.transport.sniff为true来使客户端去嗅探整个集群的状态,
// 把集群中其它机器的ip地址加到客户端中。这样做的好处是,一般你不用手动
// 设置集群里所有集群的ip到连接客户端,它会自动帮你添加,并且自动发现新加入集群的机器。
.put("client.transport.sniff", false)
.build();
String[] ipArray = ips.split(",");
try {
client = new PreBuiltTransportClient(settings);
for (String ip : ipArray) {//添加集群IP列表
TransportAddress transportAddress = new InetSocketTransportAddress(InetAddresses.forString(ip), Integer.parseInt(port));
client.addTransportAddresses(transportAddress);
}
} catch (Exception e) {
log.error("create es client error:{}", e);
if (client != null) {
client.close();
}
}
return client;
}
}
@Slf4j 是lombok 使用log方式,不用就删掉
4. 事例
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.script.Script;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* 索引测试
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IndexTest {
// @Resource(name = "dataClient")
@Autowired
private TransportClient client;
@Test
public void index() throws IOException {
IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("name", "simagang")
.field("postDate", System.currentTimeMillis())
.field("message", "trying out Elasticsearch")
.endObject()
)
.execute()
.actionGet();
}
@Test
public void indexJson() {
String json = "{" +
"\"name\":\"yy\"," +
"\"postDate\": 123456," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse response = client.prepareIndex("twitter", "tweet")
.setSource(json)
.execute()
.actionGet();
}
@Test
public void delete() {
DeleteResponse response = client.prepareDelete("twitter", "tweet", "AW6nWQXDCJTyfWC03Cpb")
.execute()
.actionGet();
}
/**
* 通过脚本更新-script
*/
@Test
public void updateByScript() throws ExecutionException, InterruptedException {
UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1");
updateRequest.script(new Script("ctx._source.name = \"sima\""));
client.update(updateRequest).get();
}
/**
* 部分更新-doc
*/
@Test
public void update() throws ExecutionException, InterruptedException, IOException {
UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
.doc(jsonBuilder()
.startObject()
.field("name", "yy")
.endObject());
client.update(updateRequest).get();
}
/**
* 存在就更新,不存在就添加
*/
@Test
public void upSert() throws ExecutionException, InterruptedException, IOException {
IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
.doc(jsonBuilder()
.startObject()
.field("name", "upyy")
.endObject())
.upsert(indexRequest);
client.update(updateRequest).get();
}
/**
* 批量请求-bulkApi
*/
@Test
public void bulkApi() throws ExecutionException, InterruptedException, IOException {
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("name", "kimchy1")
.field("postDate", 12345123123L)
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "3")
.setSource(jsonBuilder()
.startObject()
.field("name", "kimchy3")
.field("postDate", 12345123123L)
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
}
/**
* 批量处理器-BulkProcessor
*/
@Test
public void BulkProcessor() throws InterruptedException {
BulkProcessor bulkProcessor = BulkProcessor.builder(
// 添加您的elasticsearch客户端
client,
new BulkProcessor.Listener() {
// 在批量执行之前调用此方法。例如,您可以看到numberOfActions与 request.numberOfActions()
@Override
public void beforeBulk(long executionId,
BulkRequest request) {
}
// 批量执行后调用此方法。您可以例如检查是否有一些失败的请求response.hasFailures()
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) {
}
// 当批量失败并引发一个 Throwable
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) {
}
})
// 我们想每1万个请求执行一次批量处理
// .setBulkActions(10000)
// 我们要每1GB执行一次
// .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
// 无论请求多少,我们都希望每5秒刷新一次
.setFlushInterval(TimeValue.timeValueSeconds(5))
// 设置并发请求数。值为0表示仅允许执行一个请求。值为1表示允许在累积新的批量请求时执行1个并发请求。
.setConcurrentRequests(1)
.build();
IndexRequest request = new IndexRequest("twitter", "tweet", "2");
// request.id("2");
String jsonString = "{" +
"\"name\":\"kimchy1\"," +
"\"postDate\":1231356," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString);
bulkProcessor.add(request);
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "3"));
// 睡一会
Thread.sleep(10000);
}
}
源码地址见项目推荐
项目推荐
IT-CLOUD :IT服务管理平台,集成基础服务,中间件服务,监控告警服务等。
IT-CLOUD-ACTIVITI6 :Activiti教程源码。博文在本CSDN Activiti系列中。
IT-CLOUD-ELASTICSEARCH :elasticsearch教程源码。博文在本CSDN elasticsearch系列中。开源项目,持续更新中,喜欢请 Star~
扫描二维码关注公众号,回复: 9061697 查看本文章