Gahing's blog Gahing's blog
首页
知识体系
  • 前端基础
  • 应用框架
  • 工程能力
  • 应用基础
  • 专业领域
  • 业务场景
  • 前端晋升 (opens new window)
  • Git
  • 网络基础
  • 算法
  • 数据结构
  • 编程范式
  • 编解码
  • Linux
  • AIGC
  • 其他领域

    • 客户端
    • 服务端
    • 产品设计
软素质
  • 面试经验
  • 人生总结
  • 个人简历
  • 知识卡片
  • 灵感记录
  • 实用技巧
  • 知识科普
  • 友情链接
  • 美食推荐 (opens new window)
  • 收藏夹

    • 优质前端信息源 (opens new window)
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Gahing / francecil

To be best
首页
知识体系
  • 前端基础
  • 应用框架
  • 工程能力
  • 应用基础
  • 专业领域
  • 业务场景
  • 前端晋升 (opens new window)
  • Git
  • 网络基础
  • 算法
  • 数据结构
  • 编程范式
  • 编解码
  • Linux
  • AIGC
  • 其他领域

    • 客户端
    • 服务端
    • 产品设计
软素质
  • 面试经验
  • 人生总结
  • 个人简历
  • 知识卡片
  • 灵感记录
  • 实用技巧
  • 知识科普
  • 友情链接
  • 美食推荐 (opens new window)
  • 收藏夹

    • 优质前端信息源 (opens new window)
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 中间件

  • 云原生

  • 分布式

  • 存储技术

  • 数据库

  • 服务部署

  • 编程语言

    • Go

    • Java

      • Android Binder学习笔记
      • BigInteger源码解析
      • CountDownLatch使用之统计任务花费时间
      • Integer 源码方法学习
      • Java Executors 学习笔记
      • Java IO
      • Java NIO
      • NDK-JNI开发笔记
      • OpenGL ES 开发笔记
      • ThreadLocal
      • 「Java并发编程」读书笔记
      • 「垃圾收集」学习笔记
      • 动手写一个并发缓存框架 历程
        • 首先给上我们的耗时任务,和简单web框架搭建
        • 接下来开始搭建我们的并发缓存框架
          • 定义Computable接口实现
          • 创建Memoizer1
          • 改进,使用线程安全的`ConcurrentHashMap`
          • 改进,采用FutureTask
          • 故使用ConcurrentMap中的原子方法`putIfAbsent`
  • 计算技术

  • 服务端
  • 编程语言
  • Java
gahing
2020-06-29
目录

动手写一个并发缓存框架 历程草稿

# 首先给上我们的耗时任务,和简单web框架搭建

提供一个0-20的数字,计算前n项和,为了不使得计算太快 每加一次 sleep

package com.france.servlet;

import java.io.IOException;

import javax.servlet.Servlet;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;

public class TaskServlet implements Servlet{


	@Override
	public void service(ServletRequest req, ServletResponse resp)
			throws ServletException, IOException {
		// TODO Auto-generated method stub
			try {
				int num=Integer.valueOf(req.getParameter("num"));
				if(num<=0)throw new Exception("数字<=0");
				if(num>=20)throw new Exception("数字>=20");
				int sum=calculateSumWithSleep(num,500);
				System.out.println("计算得到的结果是:"+sum);
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
	}
	private int calculateSumWithSleep(int num,long ms){
		int sum=0;
		for(int i=0;i<=num;i++){
			sum+=i;
			try {
				Thread.sleep(ms);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return sum;
	}

	@Override
	public void destroy() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public ServletConfig getServletConfig() {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public String getServletInfo() {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public void init(ServletConfig arg0) throws ServletException {
		// TODO Auto-generated method stub
		
	}

}

web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5">
  <description>MuiltThreadCacheTask</description>
  <display-name>MuiltThreadCacheTask</display-name>
  <servlet>
    <display-name>TaskServlet</display-name>
    <servlet-name>TaskServlet</servlet-name>
    <servlet-class>com.france.servlet.TaskServlet</servlet-class>
  </servlet>
  <servlet-mapping>
    <servlet-name>TaskServlet</servlet-name>
    <url-pattern>/TaskServlet</url-pattern>
  </servlet-mapping>
  
  <welcome-file-list>
    <welcome-file>/index.jsp</welcome-file>
  </welcome-file-list>
</web-app>

做个测试:http://localhost:8888/MuiltThreadCacheTask/TaskServlet?num=3 控制台输出6

# 接下来开始搭建我们的并发缓存框架

先将我们刚刚的执行任务函数改成低耦合方式

# 定义Computable接口实现

package com.france;

public interface Computable<A, V> {
	V compute(A arg)throws InterruptedException;
}

Memoizer:用于缓存结果,提供Computable接口并在servlet实现 来执行任务

# 创建Memoizer1

package com.france.memoizer;

import java.util.HashMap;
import java.util.Map;

import com.france.Computable;

public class Memoizer1<A,V> implements Computable<A, V> {
	private final Map<A,V> cache = new HashMap<A,V>();
	private final Computable<A, V> c;
	@Override
	public synchronized V compute(A arg) throws InterruptedException {
		// TODO Auto-generated method stub
		V result=cache.get(arg);
		if(result==null){
			result=c.compute(arg);
			cache.put(arg, result);
		}
		return result;
	}
	public Memoizer1(Computable<A, V> c) {
		this.c = c;
	}		
}

先搭好框架,后面再说这边的弊端

修改原来的Servlet

package com.france.servlet;

import java.io.IOException;

import javax.servlet.Servlet;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;

import com.france.Computable;
import com.france.memoizer.Memoizer1;

public class TaskServlet implements Servlet{
	private final long ms=500;
	private final Computable<Integer,Integer> c =new Computable<Integer,Integer>() {

		@Override
		public Integer compute(Integer arg) throws InterruptedException {
			// TODO Auto-generated method stub
			return calculateSumWithSleep(arg);
		}
	};
	//接口实现子类 可拔插
	private final Computable<Integer,Integer> cahce=new Memoizer1<Integer,Integer>(c);
	
	@Override
	public void service(ServletRequest req, ServletResponse resp)
			throws ServletException, IOException {
		// TODO Auto-generated method stub
			try {
				int num=Integer.valueOf(req.getParameter("num"));
				if(num<=0)throw new Exception("数字<=0");
				if(num>=20)throw new Exception("数字>=20");
				int sum=this.cahce.compute(num);
				System.out.println("计算得到的结果是:"+sum);
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
	}
	private int calculateSumWithSleep(int num){
		int sum=0;
		for(int i=0;i<=num;i++){
			sum+=i;
			try {
				Thread.sleep(ms);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return sum;
	}

	@Override
	public void destroy() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public ServletConfig getServletConfig() {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public String getServletInfo() {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public void init(ServletConfig arg0) throws ServletException {
		// TODO Auto-generated method stub
		
	}

}

做测试(单机),第一次执行很慢,第二次执行从cache中拿,很快。

那么出现的问题是什么呢?

首先先明白一点,implements Servlet后,每个线程执行service方法是独立的,而cahce定义为全局变量,所有线程访问的是同一个引用而不是线程封闭

然后,由于HashMap不是线程安全的,为了保证两个线程不会同时访问HashMap,Memoizer1采用方法是对整个compute方法进行同步 虽然能确保线程安全,但是带来的问题是其他调用compute的线程被堵塞,效果上来说比不使用cache更差

# 改进,使用线程安全的ConcurrentHashMap

package com.france.memoizer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.france.Computable;

public class Memoizer2<A,V> implements Computable<A, V> {
	//将
	private final Map<A,V> cache = new ConcurrentHashMap<A,V>();
	private final Computable<A, V> c;
	//由于线程安全,这里不再使用synchronized
	@Override
	public  V compute(A arg) throws InterruptedException {
		// TODO Auto-generated method stub
		V result=cache.get(arg);
		if(result==null){
			result=c.compute(arg);
			cache.put(arg, result);
		}
		return result;
	}
	public Memoizer2(Computable<A, V> c) {
		this.c = c;
	}
	
	
}

较之前相比,多线程可并发使用它。 但是存在一个较严重的问题,两个线程同时调用compute去计算同一个值会造成重复计算 当任务耗时更久的时候问题将更严重。

图示:

A  ->f(1)不在缓存->[   计算f(1)    ]->将f(1)放入缓存
B  ----------->f(1)不在缓存->[   计算f(1)    ]->将f(1)放入缓存

这样就是同样的任务计算了2次并放了2次缓存,对一个高效的缓存框架来说是糟糕的。

# 改进,采用FutureTask

FutureTask表示一个计算的过程,可能已经计算完成,也可能正在进行。如果有结果可用,那么FutureTask.get将立即返回结果,否则会一直堵塞,直到计算结果出来再将其返回

package com.france.memoizer;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import com.france.Computable;


public class Memoizer3<A,V> implements Computable<A, V> {
	//将V改成Future<V> Future<V>表示一个得到V的过程
	private final Map<A,Future<V>> cache = new ConcurrentHashMap<A,Future<V>>();
	private final Computable<A, V> c;
	@Override
	public  V compute(final A arg) throws InterruptedException {
		// TODO Auto-generated method stub
		//获取Future
		Future<V> future=cache.get(arg);
		if(future==null){
			Callable<V> eval=new Callable<V>() {

				@Override
				public V call() throws Exception {
					// TODO Auto-generated method stub
					return c.compute(arg);
				}
			};
			//定义Callable并传递给Future
			FutureTask<V> ft=new FutureTask<V>(eval);
			future=ft;
			//将执行过程Future存在cache后开始run
			cache.put(arg, ft);
			ft.run();
		}
		try {
			return future.get();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return null;
		}
	}
	public Memoizer3(Computable<A, V> c) {
		this.c = c;
	}
	
	
}

较之前相比,基本不会出现重复计算同一个值的问题。 为什么说基本不会而不是完成不会? 原因在于compute中if代码块是非原子的先检查再执行操作

图示:

A ->f(1)不在缓存中->将f(1)的future放入缓存->[计算f(1)]->通过get()返回结果
B ---->f(1)不在缓存中->将f(1)的future放入缓存->[计算f(1)]->通过get()返回结果

虽然几率小,但还是有可能出现。 复合操作(若没有则添加)是在底层的Map对象执行的,这个对象不能通过加锁来确保原子性**(否则map堵塞将导致其他线程访问堵塞)**

# 故使用ConcurrentMap中的原子方法putIfAbsent

修改处有注释

package com.france.memoizer;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import com.france.Computable;


public class Memoizer4<A,V> implements Computable<A, V> {
	//实现用的ConcurrentHashMap而不是Map
	private final ConcurrentHashMap<A,Future<V>> cache = new ConcurrentHashMap<A,Future<V>>();
	private final Computable<A, V> c;
	@Override
	public  V compute(final A arg) throws InterruptedException {
		// TODO Auto-generated method stub
		Future<V> future=cache.get(arg);
		if(future==null){
			Callable<V> eval=new Callable<V>() {

				@Override
				public V call() throws Exception {
					// TODO Auto-generated method stub
					return c.compute(arg);
				}
			};
			FutureTask<V> ft=new FutureTask<V>(eval);
			future=ft;
			//putIfAbsent 原子方法,详情请看源码 用的是Segment控制
			cache.putIfAbsent(arg, ft);
			ft.run();
		}
		try {
			return future.get();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return null;
		}
	}
	public Memoizer4(Computable<A, V> c) {
		this.c = c;
	}
	
	
}

这样对于**线程安全性和并发性来说是很完美的 但是注意 缓存的是Future而不是值,会导致缓存污染(Cache Pollution)问题: 如果计算被取消或者失败,那么在计算这个结果时(调用get方法时)将指明计算过程被取消或者失败(抛出异常)** 解决:如果Memoizer发现计算被取消或者出现RuntimeException,那么将Future从缓存中移除,这样之后的计算才可能成功(故我们把之前的操作放在一个while(true)循环中去做)

package com.france.memoizer;

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import com.france.Computable;


public class Memoizer5<A,V> implements Computable<A, V> {
	private final ConcurrentHashMap<A,Future<V>> cache = new ConcurrentHashMap<A,Future<V>>();
	private final Computable<A, V> c;
	@Override
	public  V compute(final A arg) throws InterruptedException {
		// TODO Auto-generated method stub
		//放在一个循环中去操作 抛出异常时 移除cache 重新操作
		while(true){
			Future<V> future=cache.get(arg);
			if(future==null){
				Callable<V> eval=new Callable<V>() {

					@Override
					public V call() throws Exception {
						// TODO Auto-generated method stub
						return c.compute(arg);
					}
				};
				FutureTask<V> ft=new FutureTask<V>(eval);
				future=ft;
				cache.putIfAbsent(arg, ft);
				ft.run();
			}
			try {
				return future.get();
			}catch(CancellationException e){
				//出现异常 删除缓存
				cache.remove(arg,future);
			}catch (ExecutionException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	public Memoizer5(Computable<A, V> c) {
		this.c = c;
	}
	
	
}

当然,上面还是没有解决缓存逾期和缓存清理的问题 不清除是要从concurrentHashMap下手还是Future下手,求指教

一个想法是结合LruCache和concurrentHashMap实现自己的框架 待续

参考:<<Java并发编程第五章>>

编辑 (opens new window)
上次更新: 2023/08/26, 10:18:07
「垃圾收集」学习笔记
批处理Spark

← 「垃圾收集」学习笔记 批处理Spark→

最近更新
01
我的 2024 总结
12-31
02
浅谈代码质量与量化指标
08-27
03
快速理解 JS 装饰器
08-26
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Gahing | 闽ICP备19024221号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式