博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java.util.concurrent.Semaphore 使用
阅读量:6690 次
发布时间:2019-06-25

本文共 8587 字,大约阅读时间需要 28 分钟。

1. 概述

   Semaphore(信号)  并不存在真正的许可 只是维护一个计数器, 通常用来限定进入一些资源的线程数

 accquire()  方法获取许可 成功则计数器值-1 没有则阻塞直到一个可用的许可(即计数器>0)

   release() 方法 潜在的释放了申请人(通过给计数器值+1)

2. 示例一(单独测试信号量增减  availabelPermits对超出数量的线程的阻塞)

package com.rocky.semaphore;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class SemaphoreTest {    public static void main(String[] args) {        Semaphore semaphore = new Semaphore(5, false);//no fair 并行最大为5,阻塞后来的        ExecutorService service = Executors.newCachedThreadPool();        for(int i=0; i<10; i++){            service.execute(new Worker(semaphore));        }        service.shutdown();    }}class Worker implements Runnable{    private Semaphore semaphore;    Worker(Semaphore semaphore){        this.semaphore = semaphore;    }    @Override    public void run() {        try {            semaphore.acquire();//获取许可            try {                System.out.println(Thread.currentThread().getName()+" accessing...");                Thread.sleep((long) (Math.random()*3000));            } finally{                semaphore.release();//释放许可                System.out.println(Thread.currentThread().getName()+" leaving...");            }        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

说明:   默认是非公平的 进来一个线程获取许可, 则state减1,直到值为0 以下是源码片段

final int nonfairTryAcquireShared(int acquires) {
//acquires值为1 for (;;) { int available = getState();//当前state值 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining))//remaining>0 则CAS修改state 成功获取许可 return remaining; } }  //remainimg<0 返回后,执行下面方法 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//创建共享型节点加入等待队列 队列为空则仿制头结点并建立联系 try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);//再次尝试获取许可 if (r >= 0) {//成功获取许可 setHeadAndPropagate(node, r); p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }

 

3. 示例二(一个生产者与一个消费者 两组信号量 此消彼长 为0阻塞)

package com.rocky.semaphore;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class ProducerCustomRealizeWithSemaphore {    public static void main(String[] args) {        SemaphoreBuffer semaphoreBuffer = new SemaphoreBuffer();        Producer producer = new Producer(semaphoreBuffer);        Customer customer = new Customer(semaphoreBuffer);        ExecutorService service = Executors.newCachedThreadPool();        service.execute(customer);        service.execute(producer);        service.shutdown();    }    }class SemaphoreBuffer{    List
list = new ArrayList
(); Semaphore producerSemaphore = new Semaphore(1);// 允许并行的线程数为1 Semaphore customerSemaphore = new Semaphore(0);// 0即state的初始值 则一开始消费就阻塞了 见上例说明中remaining<0 public void put(int num){ try { producerSemaphore.acquire(); try { list.add(num); } finally{ customerSemaphore.release(); } } catch (InterruptedException e) { e.printStackTrace(); } } public int get(){ try { customerSemaphore.acquire(); try{ return list.remove(0); }finally{ producerSemaphore.release(); } } catch (InterruptedException e) { e.printStackTrace(); } return 0; } }class Customer implements Runnable{ private SemaphoreBuffer buffer; Customer(SemaphoreBuffer buffer){ this.buffer = buffer; } @Override public void run() { while(!Thread.interrupted()){ int num = buffer.get(); System.out.println("Customer get the num "+num); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }}class Producer implements Runnable{ private SemaphoreBuffer buffer; Producer(SemaphoreBuffer buffer){ this.buffer = buffer; } int c =0; @Override public void run() { while(!Thread.interrupted()){ buffer.put(c); System.out.println("Producer put the num "+c); c++; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }

 4. 示例三(多个生产者与多个消费者 运用阻塞队列  线程安全 )

package com.rocky.semaphore;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.Semaphore;import java.util.concurrent.atomic.AtomicInteger;public class ProducerCustomRealizeWithSemaphoreLinkedBlockingQueue {    static AtomicInteger c = new AtomicInteger(1);    public static void main(String[] args) {        ExecutorService service = Executors.newCachedThreadPool();        CakeStand stand = new CakeStand();        service.execute(new CakeProducer(stand, "producer1", c));        service.execute(new CakeProducer(stand, "producer2", c));        service.execute(new CakeProducer(stand, "producer3", c));        service.execute(new CakeCustomer(stand, "customer1"));        service.execute(new CakeCustomer(stand, "customer2"));    }}class Cake{    private String name;    Cake(String name){        this.name = name;    }    public String toString(){        return name;    }}class CakeStand{    BlockingQueue
queue = new LinkedBlockingQueue
(15); Semaphore notFull = new Semaphore(10);//生产信号量 Semaphore notEmpty = new Semaphore(0);//消费信号量 public void put(Cake cake){ try { notFull.acquire(); try{ queue.put(cake); }finally{ notEmpty.release(); } } catch (InterruptedException e) { e.printStackTrace(); } } public Cake take(){ try { notEmpty.acquire(); try{ Cake cake = queue.take(); return cake; }finally{ notFull.release(); } } catch (InterruptedException e) { e.printStackTrace(); } return null; }}class CakeProducer implements Runnable{ private CakeStand stand; private String name; private AtomicInteger c; public CakeProducer(CakeStand stand, String name, AtomicInteger c) { this.stand = stand; this.name = name; this.c = c; } @Override public void run() { while(!Thread.interrupted()){ String str = "cake-"+c.getAndIncrement(); System.out.println("生产:"+name+"-"+str); stand.put(new Cake(str)); try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace(); } } }}class CakeCustomer implements Runnable{ private CakeStand stand; private String name; public CakeCustomer(CakeStand stand, String name){ this.stand = stand; this.name = name; } @Override public void run() { while(!Thread.interrupted()){ Cake cake = stand.take(); System.err.println("消费:"+name+"-"+cake.toString()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }}

说明1): 传统的消费者 生产者使用wait/notify模式等待和相互唤醒, Semaphore通过信号量的值控制运行(>0)和阻塞(<=0),

两组信号量可以使两组角色彼此唤醒,使用阻塞队列可以确保线程安全。

2) 可以额外创建一个信号量Semaphore mutex = new Semaphore(1); 在获取本组信号量之后再获取metex信号量可以实现互斥锁效果

 

转载于:https://www.cnblogs.com/rocky-fang/p/6768142.html

你可能感兴趣的文章
长春7旬老人收藏明信片48年 6千张见证国家变迁
查看>>
最前线|VIPKID正寻求4-5亿美元新一轮融资,估值达60亿美元
查看>>
文 OR 理?答案都在这里!
查看>>
ES6 Module之export
查看>>
XML+JSON面试题都在这里
查看>>
教你如何攻克Kotlin中泛型型变的难点(实践篇)
查看>>
2018Android面试经历
查看>>
不受限对抗样本挑战赛介绍
查看>>
推荐10个Java方向最热门的开源项目(8月)
查看>>
浅解前端必须掌握的算法(三):直接插入排序
查看>>
[译] TensorFlow 教程 #06 - CIFAR-10
查看>>
处理 JavaScript 复杂对象:深拷贝、Immutable & Immer
查看>>
Kotlin 设计模式系列之单例模式
查看>>
阅读SSH的ERP项目【第二篇】
查看>>
如何有效的避免OOM,温故Java中的引用
查看>>
Objective C基础教程 第一章 启程
查看>>
Android开发人员不得不学习的JavaScript基础(一)
查看>>
阿里云在LC3大会上透露未来要做的两件事
查看>>
关于Socket,看我这几篇就够了(三)原来你是这样的Websocket
查看>>
NSHipster: NSRegularExpression 中文版
查看>>