多线程(callable+futureTask)去组装数据,并批量入库

2024-06-19 02:32

本文主要是介绍多线程(callable+futureTask)去组装数据,并批量入库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

      商城项目,收货地址会用到4级地址(省,市,县,镇),我们只用到了特定城市的。 但是我想通过京东的接口把全部的数据拿出来。于是就有 ------多线程(callable+futureTask)去组装数据。

---------------------------

    先贴下controller的代码:

package com.truelore.xunjia.wssc.test.controller;


import com.alibaba.fastjson.JSON;
import com.truelore.common.util.WsscHttpClientUtils;
import com.truelore.xunjia.wssc.dao.ProvinceDao;
import com.truelore.xunjia.wssc.entity.WsscArea;
import com.truelore.xunjia.wssc.entity.WsscCity;
import com.truelore.xunjia.wssc.entity.WsscProvince;
import com.truelore.xunjia.wssc.service.AreaService;
import com.truelore.xunjia.wssc.service.CityService;
import com.truelore.xunjia.wssc.service.ProvinceService;
import com.truelore.xunjia.wssc.service.TownService;
import com.truelore.xunjia.wssc.vo.JdaddressVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 拿到京东全部的4级地址
 *  fujf
 *  201853114:49:49
 */
@RequestMapping("/wssc/test")
@Controller("testGetallJdaddress")
public class TestGetallJdaddress extends SuperToken {@Autowired
    private ProvinceService provinceService;

    @Autowired
    private ProvinceDao provinceDao;

    @Autowired
    private CityService cityService;

    @Autowired
    private AreaService areaService;

    @Autowired
    private TownService townService;

    @RequestMapping("/saveAddress")public void saveAddress(String level, HttpServletResponse resp){//level ="2";
        Long start_time = System.currentTimeMillis();   //开始时间

        List<String> parentids = new ArrayList<String>();
         if("1".equals(level)){//获取省的时候,没有parentid.  直接保存好了
             List<JdaddressVo> jdaddressVos = getaddressList(level, null);
            //save(jdaddressVos,level);
             batsave(jdaddressVos,level);

        }else if("2".equals(level)){List<WsscProvince> ProvinceList = provinceService.getLocalProvince(99);
             for (WsscProvince wsscProvince : ProvinceList) {parentids.add(wsscProvince.getLocalProvinceId());
             }}else if("3".equals(level)){List<WsscCity> cityList = cityService.queryCityByCondition(null,null,null,99);
             for (WsscCity wsscCity : cityList) {parentids.add(wsscCity.getLocalCityId());
             }}else if("4".equals(level)){List<WsscArea> areaList = areaService.queryAreaByCondition(null,null,null,99);
             for (WsscArea wsscArea : areaList) {parentids.add(wsscArea.getLocalAreaId());
             }}if(!("1".equals(level))){         //不是省级的地址获取,我们就用下面的多线程方式
      List<FutureTask<List<JdaddressVo>>> futureTasks = new ArrayList<FutureTask<List<JdaddressVo>>>();
            ExecutorService executorService = Executors.newFixedThreadPool(50);
            MycallableForaddress callable = null;


            System.out.println("****************");
            for (String parentid : parentids) {callable = new MycallableForaddress(level,parentid);
                FutureTask<List<JdaddressVo>> futureTask = new FutureTask<List<JdaddressVo>>(callable);
                futureTasks.add(futureTask);
                executorService.submit(futureTask);
              while(futureTasks.size()==500){for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get();
                          if(null!=addrlist){//save(addrlist,level);
                              batsave(addrlist,level);   //换成批量保存
                          }} catch (Exception e) {e.printStackTrace();
                      }}futureTasks.clear();
              }}//循环结束,最后不满futureTasks.size()的也要保存起来
            while (futureTasks.size() > 0) {for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get();
                        batsave(addrlist,level);
                    } catch (Exception e) {e.printStackTrace();
                    }}futureTasks.clear();
            }executorService.shutdown();
        }try {Long end_time = System.currentTimeMillis();
            resp.setHeader("Content-type", "text/html;charset=UTF-8");
            resp.getWriter().write("ok");
            resp.getWriter().write("共用时:"+(end_time - start_time)+"毫秒");
        } catch (IOException e) {e.printStackTrace();
        }}//批量保存
    private void batsave(List<JdaddressVo> addrlist, String level) {if("1".equals(level)){for (JdaddressVo jdaddressVo : addrlist) {WsscProvince p = new WsscProvince();
                p.setGuid(UUID.randomUUID().toString());
                p.setLocalProvinceId(jdaddressVo.getAddressId());
                p.setProvinceName(jdaddressVo.getAddressName());
                p.setTarget(99);
                p.setTargetProvinceId(jdaddressVo.getAddressId());
                provinceService.save(p);
            }}else if("2".equals(level)){cityService.batsave(addrlist);
        }else if("3".equals(level)){areaService.batsave(addrlist);
        }else{townService.batsave(addrlist);
      }}//内部线程类    根据上级id返回下级的地址list<JdaddressVo>
    class MycallableForaddress implements Callable{private String level;

        private String parentId;

       public MycallableForaddress(String level, String parentId) {this.level = level;
           this.parentId = parentId;
       }@Override
        public Object call() throws Exception {List<JdaddressVo> jdaddressVos = getaddressList(level, parentId);
            return jdaddressVos;
        }}//返回map<地区名,编号>
      private List<JdaddressVo> getaddressList(String level,String parentId) {String url = null;
          Map maps = new HashMap<String, String>();
          maps.put("token", token);

          if ("1" .equals(level)) {                      //获取省
              url="https://bizapi.jd.com/api/area/getProvince";
          } else if ("2" .equals(level)) {               //获取市
              url="https://bizapi.jd.com/api/area/getCity";
              maps.put("id", parentId);
          } else if ("3" .equals(level)) {                //获取县
              url="https://bizapi.jd.com/api/area/getCounty";
              maps.put("id", parentId);
          } else if ("4".equals(level)) {                 //获取乡
              url="https://bizapi.jd.com/api/area/getTown";
              maps.put("id", parentId);
          } else {return null;
          }String rev = WsscHttpClientUtils.post(url, maps, null);

          if (null != rev) {Map resultmaps = (Map) JSON.parse(rev);
              System.out.println(resultmaps.get("success"));
              boolean isSuccess = (boolean) resultmaps.get("success");
              if (isSuccess) {Map<String, Integer> resultmap = (Map) resultmaps.get("result");
                  //遍历map,方法1
                  List<JdaddressVo> JdaddressList = new ArrayList<>();
                  for (Object key : resultmap.keySet()) {System.out.println(key + "---->" + resultmap.get(key));
                      JdaddressVo jdaddress = new JdaddressVo();
                      jdaddress.setAddressId(resultmap.get(key).toString());
                      jdaddress.setAddressName(key.toString());
                      jdaddress.setParentAddressId(parentId);
                      JdaddressList.add(jdaddress);
                  }return JdaddressList;
              }}else{return null;
          }return null;

      }//-------------------以下是单元测试,不用理会-----------------------------------------
   // @Test
    public void getProvinceList(){String url = "https://bizapi.jd.com/api/area/getProvince";
        Map maps = new HashMap<String,String>();
        maps.put("token", token);
        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getCityList(){String url = "https://bizapi.jd.com/api/area/getCity";
        String parentId ="6";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getCountyList(){String url = "https://bizapi.jd.com/api/area/getCounty";
        String parentId ="318";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getTownList() {String url = "https://bizapi.jd.com/api/area/getTown";
        String parentId = "319";
        Map maps = new HashMap<String, String>();
        maps.put("id", parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);

        if (null != rev) {Map resultmaps = (Map) JSON.parse(rev);
            System.out.println(resultmaps.get("success"));
            boolean isSuccess = (boolean) resultmaps.get("success");
            if (isSuccess) {Map<String,Integer> resultmap = (Map) resultmaps.get("result");

               //遍历map,方法1
                for (Object key : resultmap.keySet()){System.out.println(key+"---->"+resultmap.get(key));
                }//遍历map,方法2
               /* for (Map.Entry<String,Integer> entry : resultmap.entrySet()){
                    System.out.println(entry.getKey()+"---->"+entry.getValue());
                }*/

                //遍历map,方法3   迭代器
               /* Iterator keys = resultmap.keySet().iterator();
                while (keys.hasNext()){
                   String key = (String) keys.next();
                   System.out.println(key+"--->"+resultmap.get(key));
                }*/

            }}}
}
 

这个是内部线程类的定义。

***DaoImpl中的批量保存代码。

@Override
public void batsave(final List<JdaddressVo> addrs) {this.getSession().doWork(new Work() {@Override
                         public void execute(Connection connection) throws SQLException {String sql = "insert into WSSC_CITY(GUID,CITY_NAME,LOCAL_CITYID,LOCAL_PROVINCEID,TARGET_CITYID,TARGET_PROVINCEID,TARGET) values(?,?,?,?,?,?,?)";
                            PreparedStatement ps = connection.prepareStatement(sql);
                            for (JdaddressVo addr : addrs) {ps.setString(1, UUID.randomUUID().toString());
                               ps.setString(2,addr.getAddressName());
                               ps.setString(3,addr.getAddressId());
                               ps.setString(4,addr.getParentAddressId());
                               ps.setString(5,addr.getAddressId());
                               ps.setString(6,addr.getParentAddressId());
                               ps.setInt(7,99);
                               ps.addBatch();
                            }ps.executeBatch();
                         }});

}


-----------------

经过测试,这样处理,5万条数据导入需要几分钟,快了不少。

------

感受:

要培养一种“批量”,“缓存”的思想,比如上面代码中 的 futureTasks(满500再处理);保存数据时,jdbc去批处理等。

"满一定量再去做"


这篇关于多线程(callable+futureTask)去组装数据,并批量入库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1073756

相关文章

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Ubuntu向多台主机批量传输文件的流程步骤

《Ubuntu向多台主机批量传输文件的流程步骤》:本文主要介绍在Ubuntu中批量传输文件到多台主机的方法,需确保主机互通、用户名密码统一及端口开放,通过安装sshpass工具,准备包含目标主机信... 目录Ubuntu 向多台主机批量传输文件1.安装 sshpass2.准备主机列表文件3.创建一个批处理脚

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

C++中处理文本数据char与string的终极对比指南

《C++中处理文本数据char与string的终极对比指南》在C++编程中char和string是两种用于处理字符数据的类型,但它们在使用方式和功能上有显著的不同,:本文主要介绍C++中处理文本数... 目录1. 基本定义与本质2. 内存管理3. 操作与功能4. 性能特点5. 使用场景6. 相互转换核心区别

MySQL批量替换数据库字符集的实用方法(附详细代码)

《MySQL批量替换数据库字符集的实用方法(附详细代码)》当需要修改数据库编码和字符集时,通常需要对其下属的所有表及表中所有字段进行修改,下面:本文主要介绍MySQL批量替换数据库字符集的实用方法... 目录前言为什么要批量修改字符集?整体脚本脚本逻辑解析1. 设置目标参数2. 生成修改表默认字符集的语句3

python库pydantic数据验证和设置管理库的用途

《python库pydantic数据验证和设置管理库的用途》pydantic是一个用于数据验证和设置管理的Python库,它主要利用Python类型注解来定义数据模型的结构和验证规则,本文给大家介绍p... 目录主要特点和用途:Field数值验证参数总结pydantic 是一个让你能够 confidentl

JAVA实现亿级千万级数据顺序导出的示例代码

《JAVA实现亿级千万级数据顺序导出的示例代码》本文主要介绍了JAVA实现亿级千万级数据顺序导出的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序OOM问题。实现思路:A.启用线程池

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性