场景:
某对象被访问,并累计访问次数
特点:
1.表中该对象初始没有纪录
2.该对象首次被访问后,为其建立一条纪录
3.此后每次被访问,访问次数++
4.该对象在表中有且仅有一条纪录
分析一下这个场景:
0.为表的对象字段建立unique索引,确保同一个对象在表中仅有一条纪录
1.访问次数为共享数据,且有读和写两个操作,涉及并发
2.最先考虑以代码锁进行防并发,但是基于以下几个缺点放弃:
(1)代码锁无法就单个特定对象加锁,即对象1和对象2被访问时,一律加锁带来性能大规模下降,可能此时对象1面临并发,对象2并不面临并发,也一并给锁了
(2)代码锁在分布式场景下失效
此时考虑以乐观锁来解决
**********************
乐观锁解释:
对于添加了@Version
的注解,我们不需要手动去控制,每一次save操作会在原来的基础上+1,如果初始为null,则springdata自动设置其为0。
主线程和新线程获取了同一行记录,并且新线程优先提交了事务,版本号一致,修改成功。等到了主线程再想save提交事务时,便得到一个版本号不一致的异常,那么在项目开发中就应该自己捕获这个异常根据业务内容做对应处理,是重试还是放弃etc…
1)先读task表的数据(实际上这个表只有一条记录),得到version的值为versionValue
2)每次更新task表中的value字段时,为了防止发生冲突,需要这样操作
update task set value = newValue,version = versionValue + 1 where version = versionValue;
只有这条语句执行了,才表明本次更新value字段的值成功
如假设有两个节点A和B都要更新task表中的value字段值,差不多在同一时刻,A节点和B节点从task表中读到的version值为2,那么A节点和B节点在更新value字段值的时候,都操作 update task set value = newValue,version = 3 where version = 2;,实际上只有1个节点执行该SQL语句成功,假设A节点执行成功,那么此时task表的version字段的值是3,B节点再操作update task set value = newValue,version = 3 where version = 2;这条SQL语句是不执行的,这样就保证了更新task表时不发生冲突
**********************
开始
数据表结构:
CREATE TABLE `user_come` (
`user_come_id` int(11) NOT NULL AUTO_INCREMENT, `come_date` datetime DEFAULT NULL, `user_id` int(11) DEFAULT NULL, `version` int(20) DEFAULT NULL, PRIMARY KEY (`user_come_id`), UNIQUE KEY `user_id` (`user_id`) ) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='';这里的user_id 即为对象,只不过该对象是个人
建立实体:
- @Entity
- @Table(name="user_come")
- public class UserCome {
- @Id
- @Column(name = "user_come_id")
- private int userComeId;
- @Column(name = "user_id")
- private int userId;
- @Temporal(TemporalType.TIMESTAMP)
- @Column(name = "come_date")
- private Date comeDate;
- @Version
- private Integer version;
- 。。。
- }
version为乐观锁检测字段
- while (true) {
- try {
- UserCome userCome = userComeDao.find(Integer.parseInt(userId));
- a: if (userCome == null) {
- userCome = new UserCome();
- userCome.setUserId(Integer.parseInt(userId));
- userCome.setComeDate(new Date());
- } else {
- userCome.setComeDate(new Date());
- }
- userComeDao.save(userCome);
- break;
- } catch (ObjectOptimisticLockingFailureException e) {
- continue;
- } catch (DataIntegrityViolationException e) {
- continue;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
一:首先考虑没有则新建,当A、B两个线程同时访问时,可能出现
A读取null记录-B读取null记录-A插入-B插入
这样首先会异常,因为user_id为unique索引字段,其次,两次访问计数应当是2,而不是1+异常
对于这种情况,我们捕获DataIntergrityViolationException,然后重新来一遍,
A读取null记录-B读取null记录-A插入记录1-B异常-B读取记录1-B更新记录1
一下,userId为1,将断点打至a,然后手动操作,以userid为1建立一条记录,继续运行,报错,继续运行,回到a,查到一条userid为1的记录,修改
二:考虑已有记录,当A、B两个线程同时访问时,可能出现
A读取记录1-B读取记录1-A给记录1count+1-A修改-B给记录1count+1 ——B修改
在非锁情况下,这套流程将本来时count+2的情况变成了count+1,计数少了
再以加了乐观锁的代码
测试一下,userid为1,将断点打至a,查看记录1的count(此处为version)为2,手动操作数据库,将count改为3,以此模拟此时有另外线程修改了数据,继续走,报异常,回到a,在此查找记录1,count为3,save,ok,查看数据库,count已经为4,乐观锁可行
值得注意的是,我的函数并没有加@Transaction声明事务,反而加了后,version不再报异常,但是也没修改数据
乐观锁与cas机制同理,最终以硬件级别的锁(这里是数据库原子操作)判断版本是否已经被修改(前者利用数据库原子性,后者利用硬件级别的原子操作
顺便复习下AtomicInteger,AtomicInteger本质是基于CAS的乐观锁
全部过程有两个关键点:violate和cas硬件层面的阻塞
语录:
从内存领域来说这是乐观锁,因为它在对共享变量更新之前会先比较当前值是否与更新前的值一致,如果是,则更新,如果不是,则无限循环执行(称为自旋),直到当前值与更新前的值一致为止,才执行更新。
下面就分析一下:
先说violate,
AtomicInteger属性:
- privatevolatileint /**
- * Creates a new AtomicInteger with the given initial value.
- *
- * @param initialValue the initial value
- */ publicint /**
- * Creates a new AtomicInteger with initial value {@code 0}.
- */ public }
value是一个volatile变量,在内存中可见,任何线程都不允许对其进行拷贝,因此JVM可以保证任何时刻任何线程总能拿到该变量的最新值。
看看getAndIncrement()方法是怎么利用CAS实现的。
- /**
- * Atomically increments by one the current value.
- *
- * @return the previous value
- */ publicfinalint returnthis);
- }
此处调用unsafe 包的方法,将AtomicIntger对象和value的偏移量,用于底层硬件级别cas的取得value值,其功能相当于引用传递(如实现)
- publicfinalintlongint int do
- while
- return }
- /**
- * Atomically update Java variable to <tt>x</tt> if it is currently
- * holding <tt>expected</tt>.
- * @return <tt>true</tt> if successful
- */ publicfinalnativebooleanlong
- int int x);
我稍微解释一下,其实compareAndSwapInt的注释解释的很明确,原子的将变量的值更新为x,如果成功了返回true,我们知道,如果我们创建AtomicInteger实例时不传入参数,则原始变量的值即为0,所以上面//----------0-----------处得到的v的值即为0,1处的代码为:
while(!compareAndSwapInt(o, offset, 0, 1))我们知道offset指向的地址对应的值就是原始变量的初值0,所以与期望的值0相同,所以将初值赋值为1,返回true,取反后为false,循环结束,返回v即更新之前的值0. 这就是类似于i++操作的原子操作的实现,当然最终CAS的实现都是native的,用实现的,我们这里看不到源码,有时间我会反编译一下这段代码看看。
CAS线程安全
说了半天,我们要回归到最原始的问题了:这样怎么实现线程安全呢?请大家自己先考虑一下这个问题,其实我们在语言层面是没有做任何同步的操作的,大家也可以看到源码没有任何锁加在上面,可它为什么是线程安全的呢?这就是Atomic包下这些类的奥秘:语言层面不做处理,我们将其交给硬件—CPU和内存,利用CPU的多处理能力,实现硬件层面的阻塞,再加上volatile变量的特性即可实现基于原子操作的线程安全。所以说,CAS并不是无阻塞,只是阻塞并非在语言、线程方面,而是在硬件层面,所以无疑这样的操作会更快更高效!
主要引用:
其实现为:
int
compare_and_swap (
int
* reg,
int
oldval,
int
newval)
{
ATOMIC();
int
old_reg_val = *reg;
if
(old_reg_val == oldval)
*reg = newval;
END_ATOMIC();
return
old_reg_val;
}
但后来我想到一个问题:就++这个场景
int
compare_and_swap (
int
* reg)
{
ATOMIC();
*reg++;
END_ATOMIC();
return
*reg;
}