(1)并发:多种线程操作相同的资源,保证线程安全,合理使用资源
(2)高并发:服务能同时处理很多请求,提高程序性能
(3)知识技能
Concurrency Level: 50 Time taken for tests: 0.173 seconds Complete requests: 1000 Failed requests: 0 Total transferred: 136000 bytes HTML transferred: 4000 bytes Requests per second: 5764.98 [#/sec] (mean) Time per request: 8.673 [ms] (mean) Time per request: 0.173 [ms] (mean, across all concurrent requests) Transfer rate: 765.66 [Kbytes/sec] received
定义:当多个线程访问某个类时,不管运行时环境采用**何种调度方式**或者进程如何交替执行,并且在 **主调代码中不需要任何额外的同步或协同**,这个类都能表现出**正确的行为**,那么称这个类是线程安全的。
(1)原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作
(2)可见性:一个线程对主内存的修改可以即使被其他线程观察到
(3)有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序
// UnSafe类中的方法 @HotSpotIntrinsicCandidate public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); // 获取对象o中的底层值 // offset == v 修改成功,反之循环继续判断 } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; } @HotSpotIntrinsicCandidate public final boolean weakCompareAndSetIntRelease(Object o, long offset,int expected,int x) { return compareAndSetInt(o, offset, expected, x); //CAS }
public class VarhandleFoo { volatile int x; private Point[] points; private static final VarHandle QA;//for arrays private static final VarHandle X;//for Variables static { try { QA = MethodHandles.arrayElementVarHandle(Point[].class); X = MethodHandles.lookup(). // Lockup类 findVarHandle(Point.class, "x", int.class); //X = MethodHandles.lookup().in(Point.class).findVarHandle(Point.class, "x", int.class); } catch (ReflectiveOperationException e) { throw new Error(e); } } class Point { // ... } }
//plain read int x = (int) X.get(this); Point p = (Point) QA.get(points,10); //plain write X.set(this,1); QA.set(points,10,new Point()); //CAS X.compareAndSet(this,0,1); QA.compareAndSet(points,10,p,new Point()); //Numeric Atomic Update X.getAndAdd(this,10);
//获得给定对象偏移量上的int值 public native int getInt(Object o,long offset); //设置给定对象偏移量上的int值 public native void putInt(Object o,long offset, int x); //获得字段在对象中的偏移量 public native long objectFieldOffset(Field f); public native long staticFieldOffset(Field f); //设置给定对象的int值,使用volatile语义 public native void putIntVolatile(Object o,long offset,int x); //获得给定对象对象的int值,使用volatile语义 public native int getIntVolatile(Object o,long offset); //和putIntVolatile()一样,但是它要求被操作字段就是volatile类型的 public native void putOrderedInt(Object o,long offset,int x); // cas设值 public final boolean compareAndSwapInt(Object o, long offset, int expected, int x); // 内存屏障 public void fullFence(); public void loadFence(); public void storeFence(); public void loadLoadFence(); public void storeStoreFence(); //------------------数组操作--------------------------------- //获取给定数组的第一个元素的偏移地址 public native int arrayBaseOffset(Class<?> arrayClass); //获取给定数组的元素增量地址,也就是说每个元素的占位数 public native int arrayIndexScale(Class<?> arrayClass); //--------------------锁指令(synchronized)------------------------------- //对象加锁 public native void monitorEnter(Object o); //对象解锁 public native void monitorExit(Object o); public native boolean tryMonitorEnter(Object o); //解除给定线程的阻塞 public native void unpark(Object thread); //阻塞当前线程 public native void park(boolean isAbsolute, long time); //------------------内存操作---------------------- // 在本地内存分配一块指定大小的新内存,内存的内容未初始化;它们通常被当做垃圾回收。 public native long allocateMemory(long bytes); //重新分配给定内存地址的本地内存 public native long reallocateMemory(long address, long bytes); //将给定内存块中的所有字节设置为固定值(通常是0) public native void setMemory(Object o, long offset, long bytes, byte value); //复制一块内存 public native void copyMemory(Object srcBase, long srcOffset, Object destBase, long destOffset, long bytes); //释放给定地址的内存 public native void freeMemory(long address);
volatile boolean inited = false; // 线程1 context = loadContext(); inited = true; // 线程2 while(!inited) sleep(); doSomethingWithConfig(context);
注:集合遍历(foreach、iterator)的时候有对集合进行增删操作将导致ConcurrentModificationException.需要进行更新,可以先打标记,遍历完再进行操作。
ReetrantLock和Synchronized区别
ReetrantLock特有功能
synchronized能够在对象头标识对象所处状态,便于调试
ReentrantReadWriteLock
StampedLock
Condition
任务窃取:线程1执行完自己的任务,则去窃取线程2的任务,为了防止重复执行任务,则从其他线程的尾部进行窃取任务
缺点:
特点:
ForkJoinPool:执行类,执行ForkJoinTask
ForkJoinTask:任务类,需要实现compute方法
- | Throw Exception | Special Value | Blocks | Time Out |
---|---|---|---|---|
Insert | add(o) | offer(o) | put(o) | offer(o,timeout,timeunit) |
Remove | remove(o) | poll() | take() | poll(timeout,timeunit) |
Examine | element() | peek() |
概述
实现图
public interface Data { String getRequest(); } public class FutureClient { public Data request(String queryStr) { FutureData data = new FutureData(); new Thread(()->{ RealData realData = new RealData(queryStr); data.setRealData(realData); }).start(); return data; } } public class FutureData implements Data { private RealData realData; private volatile boolean isComplete = false; @Override public synchronized String getRequest() { while (!isComplete) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return realData.getRequest(); } public synchronized void setRealData(RealData realData) { if (isComplete) return; this.realData = realData; isComplete = true; notify(); } } public class RealData implements Data { private String result; public RealData(String queryStr) { System.out.println("根据" + queryStr + "进行查询,这是一个耗时间5s的操作"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } result = "查询结果"; } @Override public String getRequest() { return result; } } public class Main { public static void main(String[] args) { FutureClient fc = new FutureClient(); Data data = fc.request("请求参数"); System.out.println("请求发送成功"); System.out.println("继续执行其他事情"); // 这个方法会阻塞等待结果执行完成 String result = data.getRequest(); System.out.println(result); } }
概述
实现图
public class Master { // 1.承载任务的集合 private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<>(); // 2.承载所有Worker对象 private Map<String, Thread> workers = new HashMap<>(); // 3. 使用容器承载所有Worker执行任务的结果结合 private Map<String, Object> resultMap = new ConcurrentHashMap<>(); public Master(Worker worker, int workerCount) { worker.setResultMap(resultMap); worker.setTaskQueue(taskQueue); for (int i = 0; i < workerCount; i++) { // key=Worker名字 value=线程执行对象 workers.put("worker" + i, new Thread(worker)); } } public void submit(Task task) { this.taskQueue.add(task); } public void execute() { workers.values().forEach(Thread::start); } public boolean isComplete() { Collection<Thread> threads = workers.values(); for (Thread thread : threads) { if (thread.getState() != Thread.State.TERMINATED) return false; } return true; } public long getResult() { return resultMap.values().stream().mapToInt(obj->(Integer)obj).reduce((sum, val) -> sum+val).getAsInt(); } } @Data public class Task { private int id; private String name; private int price; public Task(int id, String name, int price) { this.id = id; this.name = name; this.price = price; } } @Data public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> taskQueue; private Map<String, Object> resultMap; @Override public void run() { while (true) { Task task = this.taskQueue.poll(); if (task == null) break; Object obj = handle(task); // key=id value=结果 resultMap.put(String.valueOf(task.getId()), obj); } } // 可以作为抽象方法提取出去 private Object handle(Task task) { Object object = null; // 业务耗时 try { Thread.sleep(500); object = task.getPrice(); } catch (InterruptedException e) { e.printStackTrace(); } return object; } } public class Main { public static void main(String[] args) { Master master = new Master(new Worker(), 50); Random r = new Random(); for (int i = 1; i <= 100; i++) { master.submit(new Task(i, "任务" + i, r.nextInt(1000))); } master.execute(); long start = System.currentTimeMillis(); while (true) { if (master.isComplete()) { long result = master.getResult(); System.out.println(result); System.out.println("执行时间:" + (System.currentTimeMillis() - start)); break; } } } }
概述
实现图
class ProducerThread implements Runnable { private BlockingQueue<String> blockingQueue; private AtomicInteger count = new AtomicInteger(); private volatile boolean FLAG = true; public ProducerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "生产者开始启动...."); while (FLAG) { String data = count.incrementAndGet() + ""; try { boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (offer) { System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "成功.."); } else { System.out.println(Thread.currentThread().getName() + ",生产队列" + data + "失败.."); } Thread.sleep(1000); } catch (Exception e) { } } System.out.println(Thread.currentThread().getName() + ",生产者线程停止..."); } public void stop() { this.FLAG = false; } } class ConsumerThread implements Runnable { private volatile boolean FLAG = true; private BlockingQueue<String> blockingQueue; public ConsumerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "消费者开始启动...."); while (FLAG) { try { String data = blockingQueue.poll(2, TimeUnit.SECONDS); if (data == null || data == "") { FLAG = false; System.out.println("消费者超过2秒时间未获取到消息."); return; } System.out.println("消费者获取到队列信息成功,data:" + data); } catch (Exception e) { // TODO: handle exception } } } } public class Main { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3); ProducerThread producerThread = new ProducerThread(blockingQueue); ConsumerThread consumerThread = new ConsumerThread(blockingQueue); Thread t1 = new Thread(producerThread); Thread t2 = new Thread(consumerThread); t1.start(); t2.start(); // 10秒后 停止线程.. // 可以使用CountDownLatch来等待子线程结束 try { Thread.sleep(10*1000); producerThread.stop(); } catch (Exception e) { // TODO: handle exception } } }
参数:
执行过程:
状态:
方法:
监控方法:
线程池创建:
合理配置:
使用本地变量
使用不可变类
最小化锁的作用域范围:S=1/(1-a+a/n) 阿姆达尔定律
使用线程池,不直接使用new Thread
使用同步也不要使用线程的wait和notify
使用BlockingQueue实现生产消费模式
使用并发集合而不是加锁的同步集合
使用Semaphore创建有界的访问
宁可使用同步代码块也不适用同步方法
避免使用静态变量(除非final只读)
(1)HashMap
(2)ConcurrentHashMap
java7:分段锁
java8:红黑树
定义了串行系统并行化后的加速比的计算公式和理论上限
加速比定义:加速比=优化前系统耗时/优化后系统耗时
$$
加速比公司:S=1/(1-a+a/n)
$$
$$
加速比公式:S = n-F(n-1)
$$
public class LockFreeList<E> implements List<E> { protected static class Entry<E> { E element; AtomicMarkableReference<Entry<E>> next; } protected AtomicMarkableReference<Entry<E>> head; public LockFreeList() { head = new AtomicMarkableReference<Entry<E>>(null, false); } public boolean add(E e) { if (null == e) throw new NullPointerException(); final Entry<E> newNode = new Entry<E>(e, new AtomicMarkableReference<Entry<E>>(null, false)); while (true) { Entry<E> cur = head.getReference(); newNode.next.set(cur, false); if (head.compareAndSet(cur, newNode, false, false)) { return true; } } } // http://amino-cbbs.sourceforge.net/qs_java.html }
缓存特征:
缓存命中率影响因素:
缓存分类和应用场景:
支持持久化
数据备份,主从模式
读性能11w/s,写性能8w/s
单操作原子性,支持多操作的原子性
subsribe/pushlish通知key过期
缓存一致性问题:
缓存穿透问题
缓存雪崩:
生产和消费的速度或稳定性等因素不一致
业务解耦
最终一致性(两个系统的状态一样,RocketMQ ZeroMQ):交易系统的高可靠
广播
错峰与流控:
Kafka
RabbitMQ
独立的服务共同组成一个系统
单个部署,每个跑在自己的进程中
每个服务为独立的业务开发
分布式管理,强调隔离性
标准:
客户端访问服务:Api Gateway
服务之间通信:
服务发现:zookeeper注册
服务可靠性:
计数器法(1分钟100个)
滑动窗口(10秒一个,每格都有独立的计数器)
漏桶算法
令牌桶算法
public static void nioCopyFile(String resource, String destination) throws IOException { FileInputStream fis = new FileInputStream(resource); FileOutputStream fos = new FileOutputStream(destination); FileChannel readChannel = fis.getChannel(); //读文件通道 FileChannel writeChannel = fos.getChannel(); //写文件通道 ByteBuffer buffer = ByteBuffer.allocate(1024);//读入数据缓存 while (true) { buffer.clear(); int len = readChannel.read(buffer); //读入数据 if (len == -1) break; //读取完毕 buffer.flip(); // 读写切换 writeChannel.write(buffer); //写入文件 } readChannel.close(); writeChannel.close(); }
RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw"); FileChannel fc = raf.getChannel(); //将文件映射到内存中 MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()); while(mbb.hasRemaining()){ System.out.print((char)mbb.get()); } mbb.put(0,(byte)98); //修改文件 raf.close();
参数 | 写模式 | 读模式 |
---|---|---|
position | 从position的下一个位置写数据 | 从position该位置读数据 |
capacity | 缓冲区总容量 | 缓冲区总容量 |
limit | 缓冲区实际上限,通常与capacity相等 | 代表可读容量,与上次写入的数据量相等 |
// 客户端 public class NioClient { private static final int sleepTime = 1000 * 1000 * 1000; public static void main(String[] args) throws IOException { ExecutorService tp = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { tp.execute(new EchoClient()); } } public static class EchoClient implements Runnable { @Override public void run() { Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 8000)); writer = new PrintWriter(client.getOutputStream(), true); writer.print("H"); LockSupport.parkNanos(sleepTime); writer.print("e"); LockSupport.parkNanos(sleepTime); writer.print("l"); LockSupport.parkNanos(sleepTime); writer.print("l"); LockSupport.parkNanos(sleepTime); writer.print("o"); LockSupport.parkNanos(sleepTime); writer.print("!"); LockSupport.parkNanos(sleepTime); writer.println(); // 这行很重要 writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); } catch (IOException e) { e.printStackTrace(); } finally { writer.close(); try { reader.close(); client.close(); } catch (IOException e) { e.printStackTrace(); } } } } } // 服务端 public class BIOServer { public static void main(String[] args) { ExecutorService tp = Executors.newCachedThreadPool(); ServerSocket echoServer = null; Socket clientSocket = null; try { echoServer = new ServerSocket(8000); } catch (IOException e) { System.out.println(e); } while (true) { try { clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connect!"); tp.execute(new HandleMsg(clientSocket)); } catch (IOException e) { System.out.println(e); } } } static class HandleMsg implements Runnable { private Socket clientSocket; public HandleMsg(Socket clientSocket) { this.clientSocket = clientSocket; } public void run() { BufferedReader is = null; PrintWriter os = null; try { is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); // 从InputStream当中读取客户端所发送的数据 os = new PrintWriter(clientSocket.getOutputStream(), true); String inputLine = null; long b = System.currentTimeMillis(); while ((inputLine = is.readLine()) != null) { os.println(inputLine); } long e = System.currentTimeMillis(); System.out.println("spend:" + (e - b) + " ms "); } catch (IOException e) { e.printStackTrace(); } finally { //省略资源关闭 } } } }
(2)Nio
public class NioServer { private ExecutorService tp = Executors.newCachedThreadPool(); private Selector selector; private Map<Socket, Long> geym_time_stat = new HashMap<>(); class EchoClient { private LinkedList<ByteBuffer> outQueue; public EchoClient() { outQueue = new LinkedList<>(); } public LinkedList<ByteBuffer> getOutQueue() { return outQueue; } public void enqueue(ByteBuffer byteBuffer) { outQueue.addFirst(byteBuffer); } } private void startServer() throws IOException { selector = SelectorProvider.provider().openSelector(); // 配置为非阻塞 ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // 绑定端口 InetSocketAddress address = new InetSocketAddress(8000); serverChannel.socket().bind(address); // 注册socket监听事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); for (; ; ) { selector.select(); Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); long e = 0; while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); iterator.remove(); if (selectionKey.isAcceptable()) { doAccept(selectionKey); } else if (selectionKey.isValid() && selectionKey.isReadable()) { Socket socket = ((SocketChannel) selectionKey.channel()).socket(); if (!geym_time_stat.containsKey(socket)) { geym_time_stat.put(socket, System.currentTimeMillis()); } doRead(selectionKey); } else if (selectionKey.isValid() && selectionKey.isWritable()) { doWrite(selectionKey); Socket socket = ((SocketChannel) selectionKey.channel()).socket(); e = System.currentTimeMillis(); long b = geym_time_stat.remove(socket); System.out.println("spend:" + (e - b) + "ms"); } } } } private void doWrite(SelectionKey selectionKey) { SocketChannel channel = (SocketChannel) selectionKey.channel(); EchoClient echoClient = (EchoClient) selectionKey.attachment(); LinkedList<ByteBuffer> outQueue = echoClient.getOutQueue(); ByteBuffer buffer = outQueue.getLast(); try { int len = channel.write(buffer); if (len == -1) { disconnect(selectionKey); return; } if (buffer.remaining() == 0) { outQueue.removeLast(); } } catch (IOException e) { System.out.println("Failed: write to client"); e.printStackTrace(); disconnect(selectionKey); } if (outQueue.size() == 0) { selectionKey.interestOps(SelectionKey.OP_READ); } } private void disconnect(SelectionKey selectionKey) { try { selectionKey.selector().close(); selectionKey.channel().close(); } catch (IOException e) { e.printStackTrace(); } } private void doRead(SelectionKey selectionKey) { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(8 * 1024); int len; try { len = channel.read(buffer); if (len < 0) { channel.socket().close(); return; } } catch (IOException e) { e.printStackTrace(); } buffer.flip(); tp.execute(new HandleMsg(selectionKey, buffer)); } class HandleMsg implements Runnable { private SelectionKey selectionKey; private ByteBuffer buffer; public HandleMsg(SelectionKey selectionKey, ByteBuffer buffer) { this.selectionKey = selectionKey; this.buffer = buffer; } @Override public void run() { EchoClient echoClient = (EchoClient) selectionKey.attachment(); echoClient.enqueue(buffer); selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); selector.wakeup(); //强迫selector立即返回 } } private void doAccept(SelectionKey selectionKey) { ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); SocketChannel clientChannel; try { clientChannel = server.accept(); clientChannel.configureBlocking(false); SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); EchoClient echoClient = new EchoClient(); clientKey.attach(echoClient); InetAddress inetAddress = clientChannel.socket().getInetAddress(); System.out.println("accepted from :" + inetAddress.getHostAddress()); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { NioServer nioServer = new NioServer(); try { nioServer.startServer(); } catch (IOException e) { e.printStackTrace(); } } }
(3)AIO
public class AIOEchoServer { private AsynchronousServerSocketChannel server; public static void main(String[] args) throws IOException { AIOEchoServer aioServer = new AIOEchoServer(); aioServer.init("localhost", 8000); } private void init(String host, int port) throws IOException { //ChannelGroup用来管理共享资源 AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10); server = AsynchronousServerSocketChannel.open(group); //通过setOption配置Socket server.setOption(StandardSocketOptions.SO_REUSEADDR, true); server.setOption(StandardSocketOptions.SO_RCVBUF, 16 * 1024); //绑定到指定的主机,端口 server.bind(new InetSocketAddress(host, port)); System.out.println("Listening on " + host + ":" + port); //等待连接,并注册CompletionHandler处理内核完成后的操作。 server.accept(null, new CompletionHandler<>() { final ByteBuffer buffer = ByteBuffer.allocate(1024); @Override public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println(Thread.currentThread().getName()); buffer.clear(); try { //把socket中的数据读取到buffer中 result.read(buffer).get(); buffer.flip(); System.out.println(System.currentTimeMillis()+" Echo " + new String(buffer.array()).trim() +" to"+result.getRemoteAddress()); //把收到的直接返回给客户端 result.write(buffer); buffer.flip(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { //关闭处理完的socket,并重新调用accept等待新的连接 result.close(); server.accept(null, this); } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, Object attachment) { System.out.print("Server failed...." + exc.getCause()); } }); //因为AIO不会阻塞调用进程,因此必须在主进程阻塞,才能保持进程存活。 try { Thread.sleep(Integer.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } }