MongoDB 通过 Java 代码 批量操作

Bulk 操作

  • MongoDB 提供了一种称为 BulkWrite(块)操作方式,数据不会被立即被持久化到数据库中,而是等待程序调度,确定合适的时间持久化到数据库中,其实就是批量操作
  • API com.mongodb.client.MongoCollection#bulkWrite(java.util.List<? extends com.mongodb.client.model.WriteModel<? extends TDocument>>)
  • Bulk 操作支持有序操作,和无序操作两种模式。
  • 有序操作的数据,会按照顺序操作,一旦发生错误,操作就会终止;无序操作,不安顺序执行,只会报告哪些操作发生了错误。
  • Bulk 操作支持增删改三种操作,对应的 model 分别是 :

InsertOneModel(final T document):单文档添加模型

UpdateOneModel(final Bson filter, final Bson update):单文档更新模型

UpdateManyModel(final Bson filter, final Bson update):多文档更新模型

DeleteOneModel(final Bson filter):单文档删除模型

DeleteManyModel(final Bson filter):多文档删除模型

ReplaceOneModel(final Bson filter, final T replacement):单文档替换模型

它们都继承于 com.mongodb.client.model.WriteModel。

有序操作

  • 有序批量操作的数据,会按照顺序操作,中间一旦发生错误,操作就会终止,即后面的 Model 不会再执行;
  • 使用:BulkWriteResult bulkWrite(List<? extends WriteModel<? extends TDocument>> requests)
package www.wmx.com;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.*;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * Created by Administrator on 2018/9/17 0017.
 * 文档测试拓展
 */
public class MongoDocumentExt {

    /**
     * 有序操作 ————
     *
     * @param databaseName
     * @param collectionName
     */
    public static void sequentialOperation(String databaseName, String collectionName) {
        if (databaseName != null && !"".equals(databaseName) && collectionName != null && !"".equals(collectionName)) {
            /** MongoClient(String host, int port):直接指定 MongoDB IP 与端口进行连接
             * 实际应用中 MongoDB 地址应该配置在配置文件中*/
            MongoClient mongoClient = new MongoClient("127.0.0.1", 27017);

            /**getDatabase(String databaseName):获取指定的数据库
             * 如果此数据库不存在,则会自动创建,此时存在内存中,服务器不会存在真实的数据库文件,show dbs 命令 看不到
             * 如果再往其中添加数据,服务器则会生成数据库文件,磁盘中会真实存在,show dbs 命令 可以看到
             * */
            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);

            /**获取数据库中的集合
             * 如果集合不存在,则会隐式创建,此时在内存中,MongoDB 客户端 show tables 看不到
             * 如果继续往集合插入值,则会真实写入磁盘中,show tables 会有值*/
            MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collectionName);

            Long documentSize = mongoCollection.countDocuments();
            System.out.println("集合中文档总数为:" + documentSize);

            /**InsertOneModel(T document)*/
            Document insertDocument = new Document("name", "insertModel");
            insertDocument.append("time", new Date().getTime());
            InsertOneModel<Document> insertOneModel = new InsertOneModel<Document>(insertDocument);

            /**UpdateOneModel(Bson filter, Bson update)*/
            Document updateDocument = new Document();
            updateDocument.append("$set", new Document("name", "updateOneModel").append("time", new Date().getTime()));
            Bson updateBson = Filters.eq("age", 2);
            UpdateOneModel<Document> updateOneModel = new UpdateOneModel<Document>(updateBson, updateDocument);

            /**DeleteOneModel(Bson filter)
             * 同理还有 DeleteManyModel<T>
             */
            Bson delBson = Filters.eq("age", 3);
            DeleteOneModel<Document> deleteOneModel = new DeleteOneModel<Document>(delBson);

            /**ReplaceOneModel(Bson filter, T replacement)
             * 同理还有 UpdateManyModel<T> extends WriteModel<T> */
            Bson repBson = Filters.eq("age", 4);
            Document replaceDocument = new Document("name", "replaceOneModel");
            replaceDocument.put("time", new Date().getTime());
            ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(repBson, replaceDocument);

            List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
            writeModelList.add(insertOneModel);
            writeModelList.add(updateOneModel);
            writeModelList.add(deleteOneModel);
            writeModelList.add(replaceOneModel);

            /**bulkWrite(List<? extends WriteModel<? extends TDocument>> var1):有序操作
             * 有一个重载的方法:bulkWrite(List<? extends WriteModel<? extends TDocument>> var1, BulkWriteOptions var2)
             * 用于设置是有序操作还是无序操作:
             * mongoCollection.bulkWrite(writeModelList, new BulkWriteOptions().ordered(false));
             * false 表示无序操作,true 表示有序操作
             * */
            mongoCollection.bulkWrite(writeModelList);
            mongoClient.close();
        }
    }

    public static void main(String[] args) {
        sequentialOperation("java", "c2");
    }
}

操作前:

> db
java
> db.c2.find()
{ "_id" : ObjectId("5b9f4e28218f0d439830bb3a"), "name" : "Lisi", "age" : 1, "desc" : "USB" }
{ "_id" : ObjectId("5b9f4e28218f0d439830bb3b"), "name" : "Lisi", "age" : 2, "desc" : "USA" }
{ "_id" : ObjectId("5b9f4e28218f0d439830bb3c"), "name" : "Lisi", "age" : 3, "desc" : "USB" }
{ "_id" : ObjectId("5b9f4e28218f0d439830bb3d"), "name" : "Lisi", "age" : 4, "desc" : "USB" }
>

操作后:

> db
java
> db.c2.find()
{ "_id" : ObjectId("5b9f4e28218f0d439830bb3a"), "name" : "Lisi", "age" : 1, "desc" : "USB" }
{ "_id" : ObjectId("5b9f4e28218f0d439830bb3b"), "name" : "updateOneModel", "age" : 2, "desc" : "USA", "time" : NumberLong("1537175441896") }
{ "_id" : ObjectId("5b9f4e28218f0d439830bb3d"), "name" : "replaceOneModel", "time" : NumberLong("1537175441898") }
{ "_id" : ObjectId("5b9f6fa0218f0d2b24a26832"), "name" : "insertModel", "time" : NumberLong("1537175441895") }
>

  • 如上所示批量执行了4个 WriteModel,如果中间某一个 WriteModel 操作失败,则后续的 WriteModel 不会再执行

无序操作

  • 无序操作,不按顺序执行,只会报告哪些操作发生了错误,发送错误后,其余的 操作不受影响,继续执行。
  • 使用:BulkWriteResult bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options)
  /**
     * 无序操作 ————
     *
     * @param databaseName   :数据库名称
     * @param collectionName :集合名称
     */
    public static void disorderOperation(String databaseName, String collectionName) {
        if (databaseName != null && !"".equals(databaseName) && collectionName != null && !"".equals(collectionName)) {
            /** MongoClient(String host, int port):直接指定 MongoDB IP 与端口进行连接
             * 实际应用中 MongoDB 地址应该配置在配置文件中*/
            MongoClient mongoClient = new MongoClient("127.0.0.1", 27017);

            /**getDatabase(String databaseName):获取指定的数据库
             * 如果此数据库不存在,则会自动创建,此时存在内存中,服务器不会存在真实的数据库文件,show dbs 命令 看不到
             * 如果再往其中添加数据,服务器则会生成数据库文件,磁盘中会真实存在,show dbs 命令 可以看到
             * */
            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);

            /**获取数据库中的集合
             * 如果集合不存在,则会隐式创建,此时在内存中,MongoDB 客户端 show tables 看不到
             * 如果继续往集合插入值,则会真实写入磁盘中,show tables 会有值*/
            MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collectionName);

            Long documentSize = mongoCollection.countDocuments();
            System.out.println("集合中文档总数为:" + documentSize);

            /**InsertOneModel(T document)*/
            Document insertDocument = new Document("name", "insertModel");
            insertDocument.append("time", new Date().getTime());
            InsertOneModel<Document> insertOneModel = new InsertOneModel<Document>(insertDocument);

            /**UpdateOneModel(Bson filter, Bson update)*/
            Document updateDocument = new Document();
            updateDocument.append("$set", new Document("name", "updateOneModel").append("time", new Date().getTime()));
            Bson updateBson = Filters.eq("age", 2);
            UpdateOneModel<Document> updateOneModel = new UpdateOneModel<Document>(updateBson, updateDocument);

            /**DeleteOneModel(Bson filter)
             * 同理还有 DeleteManyModel<T>
             */
            Bson delBson = Filters.eq("age", 3);
            DeleteOneModel<Document> deleteOneModel = new DeleteOneModel<Document>(delBson);

            /**ReplaceOneModel(Bson filter, T replacement)
             * 同理还有 UpdateManyModel<T> extends WriteModel<T> */
            Bson repBson = Filters.eq("age", 4);
            Document replaceDocument = new Document("name", "replaceOneModel");
            replaceDocument.put("time", new Date().getTime());
            ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(repBson, replaceDocument);

            List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
            writeModelList.add(insertOneModel);
            writeModelList.add(updateOneModel);
            writeModelList.add(deleteOneModel);
            writeModelList.add(replaceOneModel);

            /**bulkWrite(List<? extends WriteModel<? extends TDocument>> var1):有序操作
             * 有一个重载的方法:bulkWrite(List<? extends WriteModel<? extends TDocument>> var1, BulkWriteOptions var2)
             * 用于设置是有序操作还是无序操作:
             * mongoCollection.bulkWrite(writeModelList, new BulkWriteOptions().ordered(false));
             * false 表示无序操作,true 表示有序操作
             * */
            mongoCollection.bulkWrite(writeModelList, new BulkWriteOptions().ordered(false));
            mongoClient.close();
        }
    }

    public static void main(String[] args) {
        disorderOperation("java", "c2");
    }

操作前:

> db.c2.find()
{ "_id" : ObjectId("5b9f737d1ea234088a722a49"), "name" : "Lisi", "age" : 1 }
{ "_id" : ObjectId("5b9f737d1ea234088a722a4a"), "name" : "Lisi", "age" : 2 }
{ "_id" : ObjectId("5b9f737d1ea234088a722a4b"), "name" : "Lisi", "age" : 3 }
{ "_id" : ObjectId("5b9f737d1ea234088a722a4c"), "name" : "Lisi", "age" : 4 }

操作后:

> db.c2.find()
{ "_id" : ObjectId("5b9f737d1ea234088a722a49"), "name" : "Lisi", "age" : 1 }
{ "_id" : ObjectId("5b9f737d1ea234088a722a4a"), "name" : "updateOneModel", "age" : 2, "time" : NumberLong("1537176460236") }
{ "_id" : ObjectId("5b9f737d1ea234088a722a4c"), "name" : "replaceOneModel", "time" : NumberLong("1537176460237") }
{ "_id" : ObjectId("5b9f738c218f0d340cbfe2c1"), "name" : "insertModel", "time" : NumberLong("1537176460235") }
>

  • 如上所示批量执行了4个 WriteModel,如果中间某一个 WriteModel 操作失败,则其余的 WriteModel 照样会再执行

类文件完整内容

package www.wmx.com;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.*;
import org.bson.Document;
import org.bson.conversions.Bson;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * Created by Administrator on 2018/9/17 0017.
 * 文档测试拓展
 */
public class MongoDocumentExt {

    /**
     * 有序操作 ————
     *
     * @param databaseName   :数据库名称
     * @param collectionName :集合名称
     */
    public static void sequentialOperation(String databaseName, String collectionName) {
        if (databaseName != null && !"".equals(databaseName) && collectionName != null && !"".equals(collectionName)) {
            /** MongoClient(String host, int port):直接指定 MongoDB IP 与端口进行连接
             * 实际应用中 MongoDB 地址应该配置在配置文件中*/
            MongoClient mongoClient = new MongoClient("127.0.0.1", 27017);

            /**getDatabase(String databaseName):获取指定的数据库
             * 如果此数据库不存在,则会自动创建,此时存在内存中,服务器不会存在真实的数据库文件,show dbs 命令 看不到
             * 如果再往其中添加数据,服务器则会生成数据库文件,磁盘中会真实存在,show dbs 命令 可以看到
             * */
            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);

            /**获取数据库中的集合
             * 如果集合不存在,则会隐式创建,此时在内存中,MongoDB 客户端 show tables 看不到
             * 如果继续往集合插入值,则会真实写入磁盘中,show tables 会有值*/
            MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collectionName);

            Long documentSize = mongoCollection.countDocuments();
            System.out.println("集合中文档总数为:" + documentSize);

            /**InsertOneModel(T document)*/
            Document insertDocument = new Document("name", "insertModel");
            insertDocument.append("time", new Date().getTime());
            InsertOneModel<Document> insertOneModel = new InsertOneModel<Document>(insertDocument);

            /**UpdateOneModel(Bson filter, Bson update)*/
            Document updateDocument = new Document();
            updateDocument.append("$set", new Document("name", "updateOneModel").append("time", new Date().getTime()));
            Bson updateBson = Filters.eq("age", 2);
            UpdateOneModel<Document> updateOneModel = new UpdateOneModel<Document>(updateBson, updateDocument);

            /**DeleteOneModel(Bson filter)
             * 同理还有 DeleteManyModel<T>
             */
            Bson delBson = Filters.eq("age", 3);
            DeleteOneModel<Document> deleteOneModel = new DeleteOneModel<Document>(delBson);

            /**ReplaceOneModel(Bson filter, T replacement)
             * 同理还有 UpdateManyModel<T> extends WriteModel<T> */
            Bson repBson = Filters.eq("age", 4);
            Document replaceDocument = new Document("name", "replaceOneModel");
            replaceDocument.put("time", new Date().getTime());
            ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(repBson, replaceDocument);

            List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
            writeModelList.add(insertOneModel);
            writeModelList.add(updateOneModel);
            writeModelList.add(deleteOneModel);
            writeModelList.add(replaceOneModel);

            /**bulkWrite(List<? extends WriteModel<? extends TDocument>> var1):有序操作
             * 有一个重载的方法:bulkWrite(List<? extends WriteModel<? extends TDocument>> var1, BulkWriteOptions var2)
             * 用于设置是有序操作还是无序操作:
             * mongoCollection.bulkWrite(writeModelList, new BulkWriteOptions().ordered(false));
             * false 表示无序操作,true 表示有序操作
             * */
            mongoCollection.bulkWrite(writeModelList);
            mongoClient.close();
        }
    }

    /**
     * 无序操作 ————
     *
     * @param databaseName   :数据库名称
     * @param collectionName :集合名称
     */
    public static void disorderOperation(String databaseName, String collectionName) {
        if (databaseName != null && !"".equals(databaseName) && collectionName != null && !"".equals(collectionName)) {
            /** MongoClient(String host, int port):直接指定 MongoDB IP 与端口进行连接
             * 实际应用中 MongoDB 地址应该配置在配置文件中*/
            MongoClient mongoClient = new MongoClient("127.0.0.1", 27017);

            /**getDatabase(String databaseName):获取指定的数据库
             * 如果此数据库不存在,则会自动创建,此时存在内存中,服务器不会存在真实的数据库文件,show dbs 命令 看不到
             * 如果再往其中添加数据,服务器则会生成数据库文件,磁盘中会真实存在,show dbs 命令 可以看到
             * */
            MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName);

            /**获取数据库中的集合
             * 如果集合不存在,则会隐式创建,此时在内存中,MongoDB 客户端 show tables 看不到
             * 如果继续往集合插入值,则会真实写入磁盘中,show tables 会有值*/
            MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collectionName);

            Long documentSize = mongoCollection.countDocuments();
            System.out.println("集合中文档总数为:" + documentSize);

            /**InsertOneModel(T document)*/
            Document insertDocument = new Document("name", "insertModel");
            insertDocument.append("time", new Date().getTime());
            InsertOneModel<Document> insertOneModel = new InsertOneModel<Document>(insertDocument);

            /**UpdateOneModel(Bson filter, Bson update)*/
            Document updateDocument = new Document();
            updateDocument.append("$set", new Document("name", "updateOneModel").append("time", new Date().getTime()));
            Bson updateBson = Filters.eq("age", 2);
            UpdateOneModel<Document> updateOneModel = new UpdateOneModel<Document>(updateBson, updateDocument);

            /**DeleteOneModel(Bson filter)
             * 同理还有 DeleteManyModel<T>
             */
            Bson delBson = Filters.eq("age", 3);
            DeleteOneModel<Document> deleteOneModel = new DeleteOneModel<Document>(delBson);

            /**ReplaceOneModel(Bson filter, T replacement)
             * 同理还有 UpdateManyModel<T> extends WriteModel<T> */
            Bson repBson = Filters.eq("age", 4);
            Document replaceDocument = new Document("name", "replaceOneModel");
            replaceDocument.put("time", new Date().getTime());
            ReplaceOneModel<Document> replaceOneModel = new ReplaceOneModel<Document>(repBson, replaceDocument);

            List<WriteModel<Document>> writeModelList = new ArrayList<WriteModel<Document>>();
            writeModelList.add(insertOneModel);
            writeModelList.add(updateOneModel);
            writeModelList.add(deleteOneModel);
            writeModelList.add(replaceOneModel);

            /**bulkWrite(List<? extends WriteModel<? extends TDocument>> var1):有序操作
             * 有一个重载的方法:bulkWrite(List<? extends WriteModel<? extends TDocument>> var1, BulkWriteOptions var2)
             * 用于设置是有序操作还是无序操作:
             * mongoCollection.bulkWrite(writeModelList, new BulkWriteOptions().ordered(false));
             * false 表示无序操作,true 表示有序操作
             * */
            mongoCollection.bulkWrite(writeModelList, new BulkWriteOptions().ordered(false));
            mongoClient.close();
        }
    }

    public static void main(String[] args) {
        disorderOperation("java", "c2");
    }

}

猜你喜欢

转载自blog.csdn.net/wangmx1993328/article/details/82745423