多线程webservie处理大量数据


因工作原因,需要将一个表(tbA)中的所有数据,根据user_id,去请求webserive获取相关的数据,然后插入到另外的一张表(tbB)中,供他人使用。不过这个表中的数据不少有78万条左右,而这样的大批量数据操作,还不能白天执行。只能在夜里,等服务器负荷低的时候进行执行。考虑如果webservice的效率不高的时候,需要对数据进行分批执行操作。经过综合考虑,最后采用多线程技术(不过最后经过测试,效率还是不错78万条数据,使用10个线程操作,8个多小时就可以了)。

    首先在tbA表中,追加一个字段,deal_tag,这个字段为处理标志字段。该字段的数据默认为'0',如果需要分批处理,则可以设计部分数据中的该字段值为'1'。这个时候就可以读取了。考虑到服务器的负荷问题,采用的方案是读取一部分数据到缓冲区中,同时更改已经进入缓冲区中待处理的数据的deal_tag字段值为‘B’,标识该数据已经进入缓冲区中。对数据的处理,需要请求webservice。也就有可能是因为因为种种原因,webservice没有正常的返回数据,或者出现异常,而为了不让异常影响数据的处理,则每次处理完一条数据,将该数据放入另外的缓冲区中,如果请求WebService成功,标识数据的字段为'F',请求成功。如果因为WebService或者其他的原因导致失败,则标志该数据字段的deal_tag字段值为'E',等待下一次的调用调用处理。另外为了不增加数据库的负荷,等到缓冲区中的数据达到一定的数据,一次提交。而不采用处理一条数据,提交一条数据,这样效率太慢。核心代码如下(因不想透漏具体的表名,采用tbA,tbB,tbC代替):

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;



import com.epro.DBUtility.CommonHelper;

import com.epro.DBUtility.DBHelper;

import com.epro.DBUtility.BatchData;

public class FetchData implements Runnable {

   

    private int READ_COUNT = 500; //一次读取数据库多少条记录到缓冲区中


    private int FLASH_BUFFER_COUNT = 2000; //处理后的数据,多少条进行提交到数据库



    private List<UserInfo> userList = null;

    private List<String> userStateSqlList = null;

    private List<BatchData> commitDataList = null;

   

    public FetchData(){

        userList = Collections.synchronizedList(new ArrayList<UserInfo>());

        userStateSqlList = new ArrayList<String>();

        commitDataList = new ArrayList<BatchData>();

    }

   

    //读取数据



    private synchronized int readData(){

        int result = 0;

        String strSql = "SELECT user_id, serial_number, cust_name, eparchy_code, " +

            "detail_install_address, link_phone, service_code, cust_type, rate, " +

            "in_date, deal_tag " +

            "FROM tbA " +

            "WHERE deal_tag in ('1','E') AND rownum <= " + READ_COUNT;
      

        ResultSet rs = null;

        DBHelper db = new DBHelper();       
        try {
            rs = db.executeQuery(strSql);

            while(rs.next()){

                UserInfo user = new UserInfo();
                user.setUserId(rs.getString("user_id"));
                user.setSerialNumber(rs.getString("serial_number"));

                user.setCustName(rs.getString("cust_name"));

                user.setEparchyCode(rs.getString("eparchy_code"));

                user.setDetailInstallAddress(rs.getString("detail_install_address"));

                user.setLinkPhone(rs.getString("link_phone"));

                user.setServiceCode(rs.getString("service_code"));

                user.setCustType(rs.getString("cust_type"));

                user.setRate(rs.getString("rate"));

                user.setInDate(rs.getString("in_date"));
                user.setDealTag(rs.getString("deal_tag"));

               
                userList.add(user); //放入队列中               



                ++result;

            }

        } catch (SQLException e) {

            e.printStackTrace();

        }finally{

            try {

                rs.close();

            } catch (SQLException e) {

                // TODO Auto-generated catch block



                e.printStackTrace();
            }
            finally{
                rs = null;

            }

            db.Close();

        }

        userIntoBuffer(); //将查询出来的数据,更改标志位为'B'



        return result;

    }

   

    private void userIntoBuffer(){

        int buffersize = this.userList.size();
        String sql = "";
        String userID = "";

        List<String> sqlList = new ArrayList<String>();

        if(buffersize > 0){

            DBHelper db = new DBHelper();   
            for(int i=0; i<buffersize; i++){
                userID = userList.get(i).getUserId();
                sql = this.getUserStateSql(userID, "B");
                sqlList.add(sql);               

            }
            db.doBatch(sqlList); //批量进行数据处理

            db.Close();

        }
    }
  
    private String getUserStateSql(String userID, String dealTag){
        String result = "";

        result = "UPDATE tbA SET deal_tag = '" + dealTag +
            "' WHERE user_id = " + userID;

       
        return result;
    }

   

    public synchronized UserInfo getUserInfo(){

        UserInfo result = null;

        if(userList.size() > 0){
           result = userList.remove(0);
        }

        return result;

    }
   
    //开始进行数据的处理


    public void beginDealData(){

        ResInfo resInfo = null;

        while(true){

           UserInfo user = this.getUserInfo();
            if(user != null && !"".equals(user.getUserId().trim())){ //当没有数据时,跳出循环

               //进行WebService请求,获取生产环境的资源数据,对数据进行解析,返回资源对象

                resInfo = getResInfo(user.getUserId());
            }
            else if(user == null){
                break;
            }
            else if(user != null && "".equals(user.getUserId().trim())){
                resInfo = new ResInfo();
                resInfo.out_err_id = "-1";
               resInfo.ln_line_flag = "0";
                resInfo.out_err_msg = "beginDealData is null.";
                System.out.println("user id is null.");
            }

            else{
               ;
            }
            userFinish(user, resInfo); //将用户的数据和资源的数据进行合并操作


        }
        int datacount = readData(); //读取表,看是否还有要处理的数据


       if(datacount > 0){

           beginDealData(); //递归调用,对需要处理的数据,进行继续读取


        }
        this.flashBuffer(); //做最后一次的提交处理后的buffer操作

        System.out.println("finish at " + CommonHelper.getCurrentDateTime());
    }
   
    public ResInfo getResInfo(String productID){
        ResInfoReader myReader = new ResInfoReader();
        return myReader.beginRequest(productID);
    }
   
    //获取地区名称


    public String getAreaName(String areaCode){

        String result = "";

        String strSql = "SELECT tb1.area_name area_name from " +

       "tbC tb1 " +

        "where tb1.area_code = ?";
        String[] param = { areaCode };
        ResultSet rs = null;
        DBHelper db = new DBHelper();
        rs = db.executeQuery(strSql, param);
        try {
            while(rs.next()){
                result = rs.getString("area_name");
            }
        } catch (SQLException e) {

            // TODO Auto-generated catch block

           e.printStackTrace();

       } finally{
            try {
               rs.close();
            } catch (SQLException e) {
                // TODO Auto-generated catch block

                e.printStackTrace();
            }
            db.Close();

        }
       
        return result;       
    }
   
    private synchronized int userFinish(UserInfo user, ResInfo resInfo){
        int result = 0;
        String strSql = "";
        String strAreaName = ""; //地区名称

        String[] param = null;
        if("".equals(user.getServiceCode())){
            strAreaName = this.getAreaName(user.getEparchyCode());
        }
        else{
            strAreaName = this.getAreaName(user.getServiceCode());
        }
        strSql = "INSERT INTO tbB(USER_ID, SERIAL_NUMBER, " +
            "CUST_NAME, DEAL_TYPE, EPARCHY_CODE, DETAIL_INSTALL_ADDRESS, LINK_PHONE, " +
            "EXCH_ID, LINEBOX_SEQ, LINEBOX, LINEBOX_ADDR, ROW_ID, COL_ID, VCOL_SEQ, " +
            "PCABLE, PCABLE_SEQ, OCABLE, OCABLE_SEQ, FPCONNECT_ROW_ID, FPCONNECT_COL_ID, " +
            "FPCONNECT_SEQ, FCONNECT, FCONNECT_ADDR, FCONNECT_NAME, FOCONNECT_ROW_ID, " +
            "FOCONNECT_COL_ID, FOCONNECT_SEQ, SPCONNECT_ROW_ID, SPCONNECT_COL_ID, " +
            "SPCONNECT_SEQ, SCONNECT, SCONNECT_ADDR, SCONNECT_NAME, SOCONNECT_ROW_ID, " +
            "SOCONNECT_COL_ID, SOCONNECT_SEQ, LN_FAC_CODE, LS_SWITCH_ID, LN_SWITCH_NAME, " +
            "LS_SWITCH_MODE, LN_FAC_TYPE, LS_NODE_CODE, LS_NODE_ID, LS_MAC_CODE, LS_FRAME_CODE, " +
            "LN_DHCOL_CODE, LS_FRAME_ID, LS_SLOT_ID, LS_PORT_ID, LN_DHCOL_SEQ, LN_LINE_ID, " +
            "LN_LINE_FLAG, RELA_PRODUCT_NO, MEASURE_NAME, MEASURE_CODE, LINE_TYPE, REMOVE_FLAG, " +
            "IN_TIME, OUT_TIME, EXCH_NAME, TRADE_ID, CUST_TYPE, RATE) VALUES(?, ?, ?, ?, ?, ?, " +
            "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
            "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, sysdate, null, " +
            "?, ?, ?, ?)";
        param = new String[]{
                user.getUserId(),
                user.getSerialNumber(),
                user.getCustName(),
                "",
                user.getEparchyCode(),
                user.getDetailInstallAddress(),
                user.getLinkPhone(),
                user.getServiceCode(),
                resInfo.linebox_seq,
                resInfo.linebox,
                resInfo.linebox_addr,
                resInfo.row_id,
                resInfo.col_id,
                resInfo.vcol_seq,
                resInfo.pcable,
                resInfo.pcable_seq,
                resInfo.ocable,
                resInfo.ocable_seq,
                resInfo.fpconnect_row_id,
                resInfo.fpconnect_col_id,
                resInfo.fpconnect_seq,
                resInfo.fconnect,
                resInfo.fconnect_addr,
                resInfo.fconnect_name,
                resInfo.foconnect_row_id,
                resInfo.foconnect_col_id,
                resInfo.foconnect_seq,
                resInfo.fpconnect_row_id,
                resInfo.fpconnect_col_id,
                resInfo.fpconnect_seq,
                resInfo.sconnect,
                resInfo.sconnect_addr,
                resInfo.sconnect_name,
                resInfo.soconnect_row_id,
                resInfo.soconnect_col_id,
                resInfo.soconnect_seq,
                resInfo.ln_fac_code,
                resInfo.ls_switch_id,
                resInfo.ln_switch_name,
                resInfo.ls_switch_mode,
                resInfo.ln_fac_type,
                resInfo.ls_node_code,
                resInfo.ls_node_id,
                resInfo.ls_mac_code,
                resInfo.ls_frame_code,
                resInfo.ln_dhcol_code,
                resInfo.ls_frame_id,
                resInfo.ls_slot_id,
                resInfo.ls_port_id,
                resInfo.ln_dhcol_seq,
                resInfo.ln_line_id,
                resInfo.ln_line_flag,
                resInfo.rela_product_no,
                resInfo.measure_name,
                resInfo.measure_code,
                resInfo.line_type,
               "0",
                strAreaName,
                "",
                user.getCustType(),
                user.getRate()   
        };       

        if("0".equals(resInfo.out_err_id)){
            BatchData data = new BatchData(strSql, param);
            commitDataList.add(data); //提交处理后的数据,到缓冲区

           
            result = 1;
            strSql = this.getUserStateSql(user.getUserId(), "F");
        }
        else{
            //当调用webservice后,不能正常的获取数据,则修改标志位为 'E'

            strSql = this.getUserStateSql(user.getUserId(), "E");
       }
        userStateSqlList.add(strSql); //提交用户状态数据到缓冲区
        checkBuffer();
       //System.out.println("deal user id: " + user.getUserId() + " at " + CommonHelper.getCurrentDateTime());

      return result;
    }
   
    private void checkBuffer(){

        if(userStateSqlList.size() >= FLASH_BUFFER_COUNT){
            this.flashBuffer();
        }
    }
   
    private synchronized void flashBuffer(){
        List<String> stateList = new ArrayList<String>();
        List<BatchData> commitList = new ArrayList<BatchData>();
        DBHelper db = new DBHelper();
       
        while(commitDataList.size()>0){
            BatchData data112 = commitDataList.remove(0);
            commitList.add(data112);
        }
        if(commitList.size() > 0){
            db.doBatchData(commitList);
        }
       
        while(userStateSqlList.size() > 0){
            stateList.add(userStateSqlList.remove(0));
        }
        if(stateList.size() > 0){
            db.doBatch(stateList);
        }
       
        db.Close();
        System.out.println("flash buffer at " + CommonHelper.getCurrentDateTime());
    }

    public void run() {       
        this.beginDealData();       
    }
}



其中webservice的操作,其中需要jar包,请下载apache.axis,XStream包,并加入到库中。其中axis主要是用来进行webServie请求用,而XStream主要使用将返回的XML数据,翻译为java对象。

    因为是java project项目,因此少不了main函数了。代码如下:


public class Shell {   

public static void main(String[] args){

  boolean isthread = true;

       FetchData fetch = new FetchData();   

       if(isthread){           

           new Thread(fetch).start();
           new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
            new Thread(fetch).start();
        }else{
            ResInfo resInfo = fetch.getResInfo("productID");
            System.out.println("out_err_id: " + resInfo.out_err_id);
            System.out.println("out_err_msg: " + resInfo.out_err_msg);

            System.out.println("linebox: " + resInfo.linebox);
            System.out.println("linebox_addr: " + resInfo.linebox_addr);
            System.out.println("pcable: " + resInfo.pcable);
            System.out.println("ocable: " + resInfo.ocable);
        }

25.    }

26.}

猜你喜欢

转载自yuhuiblog6338999322098842.iteye.com/blog/2158057