多线程(callable+futureTask)去组装数据,并批量入库

2024-06-19 02:32

本文主要是介绍多线程(callable+futureTask)去组装数据,并批量入库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

      商城项目,收货地址会用到4级地址(省,市,县,镇),我们只用到了特定城市的。 但是我想通过京东的接口把全部的数据拿出来。于是就有 ------多线程(callable+futureTask)去组装数据。

---------------------------

    先贴下controller的代码:

package com.truelore.xunjia.wssc.test.controller;


import com.alibaba.fastjson.JSON;
import com.truelore.common.util.WsscHttpClientUtils;
import com.truelore.xunjia.wssc.dao.ProvinceDao;
import com.truelore.xunjia.wssc.entity.WsscArea;
import com.truelore.xunjia.wssc.entity.WsscCity;
import com.truelore.xunjia.wssc.entity.WsscProvince;
import com.truelore.xunjia.wssc.service.AreaService;
import com.truelore.xunjia.wssc.service.CityService;
import com.truelore.xunjia.wssc.service.ProvinceService;
import com.truelore.xunjia.wssc.service.TownService;
import com.truelore.xunjia.wssc.vo.JdaddressVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 拿到京东全部的4级地址
 *  fujf
 *  201853114:49:49
 */
@RequestMapping("/wssc/test")
@Controller("testGetallJdaddress")
public class TestGetallJdaddress extends SuperToken {@Autowired
    private ProvinceService provinceService;

    @Autowired
    private ProvinceDao provinceDao;

    @Autowired
    private CityService cityService;

    @Autowired
    private AreaService areaService;

    @Autowired
    private TownService townService;

    @RequestMapping("/saveAddress")public void saveAddress(String level, HttpServletResponse resp){//level ="2";
        Long start_time = System.currentTimeMillis();   //开始时间

        List<String> parentids = new ArrayList<String>();
         if("1".equals(level)){//获取省的时候,没有parentid.  直接保存好了
             List<JdaddressVo> jdaddressVos = getaddressList(level, null);
            //save(jdaddressVos,level);
             batsave(jdaddressVos,level);

        }else if("2".equals(level)){List<WsscProvince> ProvinceList = provinceService.getLocalProvince(99);
             for (WsscProvince wsscProvince : ProvinceList) {parentids.add(wsscProvince.getLocalProvinceId());
             }}else if("3".equals(level)){List<WsscCity> cityList = cityService.queryCityByCondition(null,null,null,99);
             for (WsscCity wsscCity : cityList) {parentids.add(wsscCity.getLocalCityId());
             }}else if("4".equals(level)){List<WsscArea> areaList = areaService.queryAreaByCondition(null,null,null,99);
             for (WsscArea wsscArea : areaList) {parentids.add(wsscArea.getLocalAreaId());
             }}if(!("1".equals(level))){         //不是省级的地址获取,我们就用下面的多线程方式
      List<FutureTask<List<JdaddressVo>>> futureTasks = new ArrayList<FutureTask<List<JdaddressVo>>>();
            ExecutorService executorService = Executors.newFixedThreadPool(50);
            MycallableForaddress callable = null;


            System.out.println("****************");
            for (String parentid : parentids) {callable = new MycallableForaddress(level,parentid);
                FutureTask<List<JdaddressVo>> futureTask = new FutureTask<List<JdaddressVo>>(callable);
                futureTasks.add(futureTask);
                executorService.submit(futureTask);
              while(futureTasks.size()==500){for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get();
                          if(null!=addrlist){//save(addrlist,level);
                              batsave(addrlist,level);   //换成批量保存
                          }} catch (Exception e) {e.printStackTrace();
                      }}futureTasks.clear();
              }}//循环结束,最后不满futureTasks.size()的也要保存起来
            while (futureTasks.size() > 0) {for (FutureTask<List<JdaddressVo>> task : futureTasks) {try {List<JdaddressVo> addrlist = task.get();
                        batsave(addrlist,level);
                    } catch (Exception e) {e.printStackTrace();
                    }}futureTasks.clear();
            }executorService.shutdown();
        }try {Long end_time = System.currentTimeMillis();
            resp.setHeader("Content-type", "text/html;charset=UTF-8");
            resp.getWriter().write("ok");
            resp.getWriter().write("共用时:"+(end_time - start_time)+"毫秒");
        } catch (IOException e) {e.printStackTrace();
        }}//批量保存
    private void batsave(List<JdaddressVo> addrlist, String level) {if("1".equals(level)){for (JdaddressVo jdaddressVo : addrlist) {WsscProvince p = new WsscProvince();
                p.setGuid(UUID.randomUUID().toString());
                p.setLocalProvinceId(jdaddressVo.getAddressId());
                p.setProvinceName(jdaddressVo.getAddressName());
                p.setTarget(99);
                p.setTargetProvinceId(jdaddressVo.getAddressId());
                provinceService.save(p);
            }}else if("2".equals(level)){cityService.batsave(addrlist);
        }else if("3".equals(level)){areaService.batsave(addrlist);
        }else{townService.batsave(addrlist);
      }}//内部线程类    根据上级id返回下级的地址list<JdaddressVo>
    class MycallableForaddress implements Callable{private String level;

        private String parentId;

       public MycallableForaddress(String level, String parentId) {this.level = level;
           this.parentId = parentId;
       }@Override
        public Object call() throws Exception {List<JdaddressVo> jdaddressVos = getaddressList(level, parentId);
            return jdaddressVos;
        }}//返回map<地区名,编号>
      private List<JdaddressVo> getaddressList(String level,String parentId) {String url = null;
          Map maps = new HashMap<String, String>();
          maps.put("token", token);

          if ("1" .equals(level)) {                      //获取省
              url="https://bizapi.jd.com/api/area/getProvince";
          } else if ("2" .equals(level)) {               //获取市
              url="https://bizapi.jd.com/api/area/getCity";
              maps.put("id", parentId);
          } else if ("3" .equals(level)) {                //获取县
              url="https://bizapi.jd.com/api/area/getCounty";
              maps.put("id", parentId);
          } else if ("4".equals(level)) {                 //获取乡
              url="https://bizapi.jd.com/api/area/getTown";
              maps.put("id", parentId);
          } else {return null;
          }String rev = WsscHttpClientUtils.post(url, maps, null);

          if (null != rev) {Map resultmaps = (Map) JSON.parse(rev);
              System.out.println(resultmaps.get("success"));
              boolean isSuccess = (boolean) resultmaps.get("success");
              if (isSuccess) {Map<String, Integer> resultmap = (Map) resultmaps.get("result");
                  //遍历map,方法1
                  List<JdaddressVo> JdaddressList = new ArrayList<>();
                  for (Object key : resultmap.keySet()) {System.out.println(key + "---->" + resultmap.get(key));
                      JdaddressVo jdaddress = new JdaddressVo();
                      jdaddress.setAddressId(resultmap.get(key).toString());
                      jdaddress.setAddressName(key.toString());
                      jdaddress.setParentAddressId(parentId);
                      JdaddressList.add(jdaddress);
                  }return JdaddressList;
              }}else{return null;
          }return null;

      }//-------------------以下是单元测试,不用理会-----------------------------------------
   // @Test
    public void getProvinceList(){String url = "https://bizapi.jd.com/api/area/getProvince";
        Map maps = new HashMap<String,String>();
        maps.put("token", token);
        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getCityList(){String url = "https://bizapi.jd.com/api/area/getCity";
        String parentId ="6";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getCountyList(){String url = "https://bizapi.jd.com/api/area/getCounty";
        String parentId ="318";
        Map maps = new HashMap<String,String>();
        maps.put("id",parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);
    }// @Test
    public void getTownList() {String url = "https://bizapi.jd.com/api/area/getTown";
        String parentId = "319";
        Map maps = new HashMap<String, String>();
        maps.put("id", parentId);
        maps.put("token", token);

        String rev = WsscHttpClientUtils.post(url, maps, null);
        System.out.println(rev);

        if (null != rev) {Map resultmaps = (Map) JSON.parse(rev);
            System.out.println(resultmaps.get("success"));
            boolean isSuccess = (boolean) resultmaps.get("success");
            if (isSuccess) {Map<String,Integer> resultmap = (Map) resultmaps.get("result");

               //遍历map,方法1
                for (Object key : resultmap.keySet()){System.out.println(key+"---->"+resultmap.get(key));
                }//遍历map,方法2
               /* for (Map.Entry<String,Integer> entry : resultmap.entrySet()){
                    System.out.println(entry.getKey()+"---->"+entry.getValue());
                }*/

                //遍历map,方法3   迭代器
               /* Iterator keys = resultmap.keySet().iterator();
                while (keys.hasNext()){
                   String key = (String) keys.next();
                   System.out.println(key+"--->"+resultmap.get(key));
                }*/

            }}}
}
 

这个是内部线程类的定义。

***DaoImpl中的批量保存代码。

@Override
public void batsave(final List<JdaddressVo> addrs) {this.getSession().doWork(new Work() {@Override
                         public void execute(Connection connection) throws SQLException {String sql = "insert into WSSC_CITY(GUID,CITY_NAME,LOCAL_CITYID,LOCAL_PROVINCEID,TARGET_CITYID,TARGET_PROVINCEID,TARGET) values(?,?,?,?,?,?,?)";
                            PreparedStatement ps = connection.prepareStatement(sql);
                            for (JdaddressVo addr : addrs) {ps.setString(1, UUID.randomUUID().toString());
                               ps.setString(2,addr.getAddressName());
                               ps.setString(3,addr.getAddressId());
                               ps.setString(4,addr.getParentAddressId());
                               ps.setString(5,addr.getAddressId());
                               ps.setString(6,addr.getParentAddressId());
                               ps.setInt(7,99);
                               ps.addBatch();
                            }ps.executeBatch();
                         }});

}


-----------------

经过测试,这样处理,5万条数据导入需要几分钟,快了不少。

------

感受:

要培养一种“批量”,“缓存”的思想,比如上面代码中 的 futureTasks(满500再处理);保存数据时,jdbc去批处理等。

"满一定量再去做"


这篇关于多线程(callable+futureTask)去组装数据,并批量入库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1073756

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键

Java如何从Redis中批量读取数据

《Java如何从Redis中批量读取数据》:本文主要介绍Java如何从Redis中批量读取数据的情况,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一.背景概述二.分析与实现三.发现问题与屡次改进3.1.QPS过高而且波动很大3.2.程序中断,抛异常3.3.内存消

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock