CountDownLatch实践

原创 esjson 并发编程实战 2641

看过很多关于CountDownLatch的文章,在此来简述下我对于CountDownLatch类的一些见解和亲身实践场景

初识CountDownLatch

首先来介绍下并发包下的CountDownLatch:

  • CountDownLatch:一个同步工具类,也称为闭锁,允许一个或多个线程处于等待状态,直到一组操作在其它线程里执行完。

  • CountDownLatch在实例初始化的时候会给定一个计数量(count),一旦线程调用实例的await()方法,这个线程将会阻塞直到CountDownLatch实例的计数置为0,线程才恢复执行,并且这个计数是不可重置的,因为它只提供了countdown() 用来给计数自减 方法,计数减到0后,没有其他方法可以修改这个计数。

使用场景:

  • 场景1:开始信号。

    项目中常用于一些任务等待其他事件(例如初始化工作完毕)达成后才开始工作,此时的CountDownLatch就相当于田径赛跑的开跑枪,枪响了比赛才正式开始,选手们才能向终点发起冲刺。

    我的场景1实践:在我的某个项目中存在API路由模块中,这个应用采用spring boot构建,有个功能点是:APi路由发现并监视注册到zookeeper中的API服务的示例列表,当请求到api路由时,路由根据请求的api编码去负载转发到具体的APi服务,然后再把结果响应回请求方。
    路由模块中有个关键的类用来维护API编码和API示例节点的关系,这个类必须在整个应用启动完毕处理外界请求前先初始化完成,要不然就会造成请求找不到对应的服务响应了。篇幅有限,只上部分代码:

@Component
public class AppInitializing implements InitializingBean  {
private static final Logger logger = LoggerFactory.getLogger(AppInitializing.class);
    private static final ConcurrentHashMap<String, ApiInfo> apiCodeMap = new ConcurrentHashMap<String, ApiInfo>();

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    @Value("${zkClient.servers}")
    private String zooServers;

    @Value("${zkClient.sessionTimeout}")
    private int sessionTimeout;

    @Value("${zkClient.connectionTimeout}")
    private int connectionTimeout;

    @Value("${zkClient.apiParentNode}")
    private String apiServiceNode;

    private ZkClient client;


    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("容器启动执行代码===========================");
 apiServiceNode);
        Thread thread = new Thread(new ApiRegisterFindService());
        thread.start();
        countDownLatch.await();//等待服务map初始化完毕
        logger.info("容器启动后代码完毕===============================");
    }

    private class ApiRegisterFindService implements Runnable {

        @Override
        public void run() {
            logger.info("api服务发现线程启动===================");
           ......
             //这里是复杂的监听发现服务代码
           ......

            countDownLatch.countDown();//由1置为0,即主线程可以继续下一步了
        }
    }

     /**
     * 根据apiCode获取api服务信息
     *
     * @param apiCode
     * @return
     */
    public ApiInfo getApiInfoByApiCode(String apiCode) {
        return apiCodeMap.get(apiCode);
    }

}
  • 场景2:任务分解汇总。

    简单举例,一个量比较大的计算任务A,单独运行计算要花很长时间,如果这些量平均分配到多个小任务中同时计算后,再汇总,将会节省很多时间,你应该可以联想到分布式计算。

场景2实践:某天运维欧巴提了个需求,现有的后台有个 根据DNS地址和域名解析 域名的(所有动态)ip 的功能,这个解析是调用系统的dig命令完成的,后台主要对命令执行后的响应信息进行解析,程序里得等到同个域名的所有结果出来后才做相应处理,但是呢,这个DNS地址量是非常大的,每个地址调用命令去解析都有个时间耗时,大概在3~5秒左右,可想而知,串行执行的话简直就是噩梦,尤其是运维欧巴在系统后台点击执行,然后ajax请求等待的,等待过程基本都不用做其他操作了,为了能在两分钟内执行完,再调用await方法时多加了个超时参数,超过时间的数据不再处理(当然事先征得了运维欧巴的同意)。整个代码逻辑抽象下,大概是这样的:

class LookupIpsRun implements Runnable {

        String host;//域名
        String[] dnsArr; //dns地址数组
        List<String> ips; // 解析结果存放的地方
        CountDownLatch latch; // CountDownLatch实例引用

        LookupIpsRun(String[] dnsArr, List<String> ips, String host, CountDownLatch latch) {
            this.host = host;
            this.dnsArr = dnsArr;
            this.ips = ips;
            this.latch = latch;
        }

        @Override
        public void run() {

            for (String dnsItem : dnsArr) {
                if (dnsItem != null&& dnsItem.length()>0 ) {
                    Set<String> lookupIps = DnsUtils.nsLookup(dnsItem, host);
                    if (!CollectionUtils.isEmpty(lookupIps)) {
                        ips.addAll(lookupIps);//存放解析结果
                    }
                }
            }
            latch.countDown();
        }

    }



    public void gameHostIpLookUp() {
        String dnss = game.getStr("dns");
        String[] dnsArr = dnss.split(";");
        Map<String, Set<String>> dnsIpsMap = new HashMap<String, Set<String>>();
        Set<String> hostIps = newHashSet<String>();
        if (dnsArr.length > 10) {
            final CountDownLatch latch = new CountDownLatch(5);
            final List<String> ips1 = new ArrayList<String>();
            final List<String> ips2 = new ArrayList<String>();
            final List<String> ips3 = new ArrayList<String>();
            final List<String> ips4 = new ArrayList<String>();
            final List<String> ips5 = new ArrayList<String>();
            int index = 0;
            int avg = dnsArr.length / 5;
            final String[] dnsArr1 = Arrays.copyOfRange(dnsArr, index, index += avg);
            final String[] dnsArr2 = Arrays.copyOfRange(dnsArr, index, index += avg);
            final String[] dnsArr3 = Arrays.copyOfRange(dnsArr, index, index += avg);
            final String[] dnsArr4 = Arrays.copyOfRange(dnsArr, index, index += avg);
            final String[] dnsArr5 = Arrays.copyOfRange(dnsArr, index, dnsArr.length);
            new Thread(new LookupIpsRun(dnsArr1, ips1, hostItem, latch)).start();
            new Thread(new LookupIpsRun(dnsArr2, ips2, hostItem, latch)).start();
            new Thread(new LookupIpsRun(dnsArr3, ips3, hostItem, latch)).start();
            new Thread(new LookupIpsRun(dnsArr4, ips4, hostItem, latch)).start();
            new Thread(new LookupIpsRun(dnsArr5, ips5, hostItem, latch)).start();
            try {
                 latch.await(2L, TimeUnit.MINUTES);
                 if (!CollectionUtils.isEmpty(ips1)) {
                     hostIps.addAll(ips1);
                 }
                 if (!CollectionUtils.isEmpty(ips2)) {
                 hostIps.addAll(ips2);
                }
                if (!CollectionUtils.isEmpty(ips3)) {
                    hostIps.addAll(ips3);
                }
                if (!CollectionUtils.isEmpty(ips4)) {
                    hostIps.addAll(ips4);
                }
                if (!CollectionUtils.isEmpty(ips5)) {
                    hostIps.addAll(ips5);
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        } else {
            for (String dnsItem : dnsArr) {
                if (dnsItem != null) {
                    Set<String> lookupIps = DnsUtils.nsLookup(dnsItem, hostItem);
                    if (!CollectionUtils.isEmpty(lookupIps)) {
                        hostIps.addAll(lookupIps);
                    }
                }
            }
        }
        ....//处理最后得到的结果hostIps            
    }

以上代码很多优化的空间和不严谨的地方,在此只是作为一个实践例子。

CountDownLatch 并发 原创
取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

关于我

过所爱的生活,爱所过的生活,快乐的生活,才能生活快乐,快乐的工作,才有快乐人生,生活的理想其实就是理想的生活!