MongoDB线程安全批量处理
Mongo批处理工具类:
package com.saike.solr.server.util; import java.net.UnknownHostException; import java.util.ArrayList; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.Mongo; import com.mongodb.MongoException; import com.mongodb.MongoOptions; /** * 批处理工具类 * @author xieyong * */ public class UtileMongDB { UtilThreadLocal<ArrayList<DBObject>> localBatch; /**mongo单例对象 根据官方文档mongojava是线程安全的*/ private static Mongo mongo; private static DBCollection coll; //private static Log log = LogFactory.getLog(UtileMongDB.class); private static DB db; static{ /** 实例化db*/ MongoOptions options = new MongoOptions(); options.autoConnectRetry = true; options.connectionsPerHost = 1000; options.maxWaitTime = 5000; options.socketTimeout = 0; options.connectTimeout = 15000; options.threadsAllowedToBlockForConnectionMultiplier = 5000; try { mongo = new Mongo(MongoDBConstant.MONGO_HOST,MongoDBConstant.MONGO_PORT); } catch (UnknownHostException | MongoException e) { e.printStackTrace(); } // boolean auth = db.authenticate(myUserName, myPassword); } public UtileMongDB(){ try { localBatch = new UtilThreadLocal<ArrayList<DBObject>>(ArrayList.class); } catch (Exception e) { e.printStackTrace(); } } /** * 返回db对象 * @return db */ public static DB getDB(){ if(db==null){ db = mongo.getDB(MongoDBConstant.MONGO_DB); } return db; } /** * 返回mongo * @return mongo连接池 */ public static Mongo getMong(){ return mongo; } /** * 读取集合 * @return mongo集合 * */ public static DBCollection getColl(String collname){ return getDB().getCollection(collname); } public static DBCollection getColl(){ return getDB().getCollection(MongoDBConstant.MONGO_COLLECTION); } /** crud操作 */ public void addBatch(String key,String value){ BasicDBObject basicDB = new BasicDBObject(); basicDB.put(key, value); /** 这里用线程本地变量,不用会存在竞技条件*/ localBatch.newGet().add(basicDB); } /** * 执行批处理 * */ public void executeInsertBatch(){ getColl().insert(localBatch.get()); localBatch.get().clear(); } /** * 执行批量删除 */ public void executeDeleteBatch(){ ArrayList<DBObject> array = localBatch.get(); for(DBObject obj:array){ getColl().remove(obj); } localBatch.get().clear(); } public DBCursor query(String key,String value){ BasicDBObject basicDBObject = new BasicDBObject(); basicDBObject.put(key,value); return getColl().find(basicDBObject); } }
ThreadLocal的封装:
package com.saike.solr.server.util; import java.lang.reflect.Constructor; /** * * @author xieyong * * @param <T> 本地线程变量对象了类型 */ public class UtilThreadLocal<T> extends ThreadLocal<T> { /**参数集合*/ Object[] obj; /**实例化构造函数*/ Constructor<T> construct; /** * * @param clazz 本地变量的class * @param args 构造函数的参数 * @throws NoSuchMethodException * @throws SecurityException */ public UtilThreadLocal(Class clazz,Object... args) throws NoSuchMethodException, SecurityException{ this.obj = obj; Class[] clazzs = null; /** new 获取参数class供获取构造函数用*/ if(args != null) if(args.length !=0){ clazzs = new Class[args.length]; for(int i = 0;i<args.length;i++){ clazzs[i] = args[i].getClass(); } } this.construct = clazz.getConstructor(clazzs); } /** * 如果当前线程没有对象创建一个新对象 * @return */ public T newGet(){ T tar = super.get() ; if(tar == null){ try { tar = construct.newInstance(obj); super.set(tar); }catch(Exception e){ e.printStackTrace(); } } return tar; } }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。