Berkeley DB TPS测试代码

最近搞一个高并发的服务中心,需要把数据写入到MySql中,结果测试发现最大TPS才4K,经过讨论后决定先把接收到的数据写到本地,然后通过同步线程再同步到MySql。

最初本地存储选用的SqlLite,结果测试发现SqlLite支持并发有问题;又选型BerkeleyDB,经过测试发现BerkeleyDB满足需求。

BerkeleyDB测试代码如下:

注:代码还有改造的地方,如initCheck方法去掉同步,改为初始化为同步,请在项目中自行修改

package test.berkelyDb;

import java.io.File;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.msgpack.MessagePack;

import com.sleepycat.je.Cursor;
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;

public class TestBerkely {

    public Environment env;

    public Database db;

    private static final String dbName = "jsf";

    public synchronized void initAndCheck() throws Exception {
        if (env != null && env.isValid()) {
            return;
        }
        EnvironmentConfig envConfig = new EnvironmentConfig();
        envConfig.setAllowCreate(true);
        envConfig.setCacheSize(10*1024 * 1024);
        try {
            env = new Environment(new File("e:\\test"), envConfig);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void open() {
        if(db != null){
            return;
        }
        DatabaseConfig dbConfig = new DatabaseConfig();
        dbConfig.setSortedDuplicates(true);
        dbConfig.setAllowCreate(true);
        try {
            db = env.openDatabase(null, dbName, dbConfig);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void close() {
        if (db != null) {
            try {
                db.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if (env != null) {
            try {
                env.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public Object get(String key) throws Exception {
        DatabaseEntry queryKey = new DatabaseEntry();
        DatabaseEntry value = new DatabaseEntry();
        queryKey.setData(key.getBytes("UTF-8"));
        OperationStatus status = db.get(null, queryKey, value, LockMode.DEFAULT);
        if (status == OperationStatus.SUCCESS) {
            return new String(value.getData());
        }
        return null;
    }

    public boolean put(String key, byte values[]) throws Exception {
        byte[] theKey = key.getBytes("UTF-8");
        OperationStatus status = db.put(null, new DatabaseEntry(theKey),
                new DatabaseEntry(values));
        if (status == OperationStatus.SUCCESS) {
            return true;
        }
        return false;
    }
    
    public boolean del(String key) throws Exception{
        byte[] theKey = key.getBytes("UTF-8");
           OperationStatus status = db.delete(null, new DatabaseEntry(theKey));
           if(status == OperationStatus.SUCCESS) {
            return true;
           }
           return false;
    }

    public static void main(String[] args) throws Exception {
        final long len = 10000000;
        final TestBerkely tb = new TestBerkely();
        final AtomicInteger counter = new AtomicInteger(1);
        tb.initAndCheck();
        tb.open();
        Timer timer = new Timer();

        Client client = new Client();
        client.setAlias("[email protected]");
        client.setAppPath("E:\\workspace\\MyProject\\bin");
        client.setCreateTime(new Date());
        client.setId(100000);
        client.setInsKey(TestBerkely.class.getCanonicalName() + "::[email protected]");
        client.setInterfaceId(10092389);
        client.setIp("192.168.229.39");
        client.setPid(2398);
        client.setProtocol(1);
        client.setSafVer(120);
        client.setSrcType(1);
        client.setStartTime(System.currentTimeMillis());
        client.setStatus(1);
        client.setUniqKey("uniqKey");
        client.setUpdateTime(new Date());
        client.setUrlDesc("89uf92438yq29384yf");
        
        MessagePack mp = new MessagePack();
        mp.register(Client.class);
        final byte data[] = mp.write(client);
        
        ExecutorService exePool = Executors.newFixedThreadPool(10);


        final long start = System.currentTimeMillis();
        //统计TPS线程
        timer.schedule(new TimerTask() {
            
            @Override
            public void run() {
                long end = System.currentTimeMillis();
                long time = (end-start)/1000;
                if(time == 0){
                    return;
                }
                int current = counter.get();
                System.out.println("***********----------------->" + (current*100/time/100f));
                
            }
        },    1000, 2000);
        for(int i=0; i<10; i++){
            exePool.execute(new Runnable() {
                
                @Override
                public void run() {
                    int num = counter.getAndIncrement();
                    String key = "key" + num;
                    try {
                        while(true){
                            tb.put(key, data);
                            if(counter.get() < len) {
                                num = counter.getAndIncrement();
                                key = "key" + num;
                                continue;
                            }
                            break;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        
        
        exePool.shutdown();
        try {
            while(!exePool.awaitTermination(1, TimeUnit.SECONDS)){
                
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();
        
        long tps = len/((end - start)/1000);
        System.out.println("tps------------------>" + tps);
        
        timer.cancel();
        
        tb.env.sync();
        tb.env.cleanLog();

        counter.set(0);
        int errorNum = 0;
        //检查写入数据
        while(counter.get() < 100000){
            if(tb.get("key" + counter.getAndIncrement()) == null){
                errorNum++;
            }
        }
        System.out.println("error data is ----------->" + errorNum);
        tb.del("key" + 5000);
        tb.env.sync();
        tb.close();
    }

}


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。