Clojure之atom详解

atom是Clojure的同步原语中最基本的一个组件, 实现上没有什么特别的, 就是对Java的java.util.concurrent.atomic中的AtomicReference的一个wrapper. Atomic功能需要直接的CPU硬件的支持, 而不仅仅是一个第三方软件库而已. 所以应该先从这里讲起.

test and set

这个问题我觉得讲的最好的是"PThreads Primer A Guide to Multithreaded Programming" 的第六章 Synchronization.

test和set是两条指令, 这两条指令会作为一个整体执行, 如同一条指令, 实际的实现中是一条指令, 但是执行两个操作, 这两条指令的间隙不会被中断或者被其他线程抢占. test是将某个内存中的bit加载到寄存器中, set是将这个内存中的bit设置为1. 一个bit只有两种状态, 要么是0, 要么是1. 那么这个操作执行完之后, 内存中的bit的old value保存在寄存器中, 而新的value是1. 这样的话, 这个内存bit可以作为一个锁. 当他的值是0的时候, 表示没有被占用, 值是1的时候表示被占用. 占用一个锁有几种不同的表达方法: acquire the lock, take ownership of the lock, locking the lock. 很显然, 如果这个bit的值已经是1, 那么执行之后, 寄存器中的值和内存中的值都是1, 此时无论执行多少次, 寄存器中的值都是1, 这相当于block, 即没有成功的占用锁, 而如果bit的值是0, 则会被修改为1, 这相当于成功的获得锁. 释放锁只需要简单的将这个bit修改为0即可.

这条指令是最早期的一种方案, 目前所有的处理器都是使用它的升级版: compare and swap(CAS).

test and set这种类型的硬件指令是所有基于锁的并发编程的基础. 各种方案例如互斥, 信号, 关键区域等等可以建立在这条指令的基础上. 所有这些的共同特点就是他们都是悲观锁. 加锁意味着将所有其他线程挡在门外, 确定没有任何危险的时候才去修改共享变量. 与之相对的是乐观锁, 下面要讲到的compare and swap是这种类型的并发的硬件基础. 也就是无锁并发, 这种方案无视可能和其他线程冲突的危险, 只管去执行修改, 但是可以检测到冲突, 如果不成功则重试, 直到成功为止, 在实现上这将会是一个无限循环, 成功的时候跳出.

compare and swap

前面说到了compare and swap是test and set的升级版, 实际上test and set是compare and swap的一个特列, compare and swap更加通用. compare and swap接受三个参数: 一个内存地址, 一个old value, 一个new value.

指令的伪码如下:

 
int compare_and_swap ( int *address, int old_value, int new_value) 
{ 
  int temp = *memory_location;
  if (temp == old_value) 
     *address = new_value;
  return temp;
}
 

上面的操作整体作为一个原子操作, 在一个指令中完成. 意思是内存地址中的值必须和参数old_value相等才执行更新操作, 最后返回内存地址中更新之前的值.

注意到如果将old_value固定为0, new_value固定位1, 而且address里面值只能是0或1, 那么就会等价于test and set. 如果compare_and_swap返回0 说明获得锁, 如果返回1说明没有获得. 所以test and set只是compare and swap的一个特例. 因此现代的处理器基本只需要compare and swap就足够了.

除了可以代替test and set, compare and swap还为实现无锁并发提供了硬件基础. compare and swap中那个被修改的内存地址不再仅仅作为一个flag或者标志位, 而是以作为一个共享变量, 例如一个整型的计数器, 如果有多个线程并发的对这个计数器执行加1的操作, 我们可以在不用锁的情况下利用compare and swap来安全的做到这一点.

这种典型的场景也叫做RMW(READ MODIFY WRITE 读取 修改 写入), 在计数器的例子中, 一个线程需要做的事情是读取计数器的值, 在这个值的基础上加1, 然后写回到计数器当中. 伪码如下

 
int old_value = *address; // R
int new_value = old_value + 1; // M
int cas = compare_and_swap(address, old_value, new_value); // W
 

通过比较compare_and_swap的返回值和old_value是否相等, 我们可以得到两种可能的结论, 如果相等, 说明更新发生了, 内存地址中现在的值应该是new_value, 同时也表明第一句代码开始到最后一句代码执行之间, 内存地址中的值没有其他线程修改过. 反之说明在这之间有其他线程修改了内存的值, 而且当前线程的compare_and_swap中的更新没有执行, 实际上也不能执行, 因为此时的new_value是错误的, 是根据一个已经无效的old_value计算出来的, 此时应该重新执行整个RMW流程, 也许这次仍然会失败, 但是最终在某次重试的时候会成功.

 
for (;;) {
  int old_value = *address; // R
  int new_value = old_value + 1; // M
  int cas = compare_and_swap(address, old_value, new_value); // W
  if( cas == old_value) break;
}
 

如果仔细观察的话, 这个基于cas的方案其实已经具备了典型的事务的所有特征, 虽然是最简单的一种事务. for循环就是事务的开始, 中间的一组操作是事务的内容, 最后的cas操作就是事务提交, 提交失败的时候, 因为提交失败的话是不会修改任何东西的, 因此不需要回滚, 只需要重试即可. 而且事务的隔离级别是Serializable的, 比MVCC的隔离级别还要高一级. 因为即使并发的执行, 最终成功的事务其实是串行的, 不可能两个事务同时成功.

Clojure中的atom变量是对Java 的AtomicReference的包装, 最终也就是基于CAS的, 而ref变量则是基于MVCC的事务控制, 其实原理和cas非常相似, 只是涉及到多个变量的协同, 而且增加了许多更加精细的控制. Clojure中修改atom不需要dosync, 但是对atom的修改本身就是一个隐含的事务, 而且是串行隔离级别的.

compare and swap vs compare and set

除了compare and swap, 我们经常会看到另一个词: compare and set. 例如AtomicReference的一个方法成员就叫做compareAndSet. 这两个概念只有一些非常细微的差别, 所以我们经常可以看到某些材料对着两个概念的混淆.

compare and swap返回的是在该指令执行的时刻, 内存地址中的值, 然而实际使用中, 我们往往关心的是这个值和参数中的old_value是否相等这个boolean值. compare and set就是做这件事情的, 相当于将cas和之后的比较合并为一个函数了

 
bool compare_and_set(int * address , int old_value, new_value) {
  int cas = compare_and_swap(address, old_value, new_value); // W
  return cas == old_value;
}
 
 
 

那么上面的代码可以写成

 
for (;;) {
  int old_value = *address; // R
  int new_value = old_value + 1; // M
  if ( compare_and_set (address, old_value, new_value) ) break; 
}
 

compare and set在Java中是通过Unsafe_CompareAndSwapInt实现的, 如下:

 
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(
  JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
 

java.util.concurrent.atomic 包, 和AtomicReference

这个包从JDK 5.0开始存在, 作用是提供基于单个变量的无锁并发编程. 所有的类都是基于cas指令的. 在不同的平台上最终会使用相应的硬件提供的cas实现.

我们知道在cas中, 内存处存放的不再只是一个标志位, 而是实际的值, 上面的例子默认为int型的整数, 实际应用中我们可能需要值是各种类型的数据, 这个包里提供了四种值的类型: Boolean, Integer, Long, Reference.

前三种和我们前面使用的默认int整型的例子类似, 很容易理解, 这里面的重点是AtomicReference, Clojure的atom正是对这个类的包装. 顾名思义AtomicReference意思就是所要处理的单个变量是一个Reference, Reference和其所值的内容是两回事. 例如一个List的Reference, 即使修改了List中的内容, 而Reference仍然是不变的. 在CAS中, 和old_value比较的时候, AtomicReference用的是==, 即Reference equality, 这和AtomicInteger有本质的不同, AtomicInteger的equality叫做structural equality.

那么这种情况下一个RMW流程中, 得到一个新的Reference的办法只能是将原来的整个数据结构拷贝出来, 然后做相应的修改, 而且实际进行比较的并不是这些数据本身, 而是指向他们的引用. 举例来说, AtomicInteger 和 AtomicReference的区别在哪里? 如果并发的对一个AtomicInteger进行递增, 至始至终只有一个Int对象, 随着时间不断递增, 而如果用AtomicReference来实现的话, 每次增1其实是需要新建一个int对象, 如果增1操作执行了1000次, 意味着前后总共创建了1000个int对象, AtomicReference中保存的Reference则会变化, 不断的变为指向新的int对象. 这实际就是Clojure中identity和state的概念, 变量只是一种绑定, 变量只能建立新的绑定, 而不是改变其中的数据.

而传统的Java数据结构, 例如列表之类的, 他们本身是不支持immutable的, 如果在一个AtomicReference>中引用的是包含有1000元素的列表, 如果要做到线程安全的修改, 每次修改的过程是读取old_value, 拷贝1000个元素, 得到一个新的List, 对这个List做修改, 然后用cas试图将这个新的List引用更新到AtomicReference>中. 考虑到这是有可能重试的, 如果发生重试的话, 刚才拷贝动作要再做一遍. 这种场景下AtomicReference和传统的Java内置数据结构的结合是没有多大意义的.

所以AtomicReference实际使用起来实际是非常tricky的, 因为是违反直觉的.

Reference equality的示例如下:

 
List<Integer> l1 = new ArrayList<Integer>();
l1.add(1);
 
List<Integer> l2 = new ArrayList<Integer>();
l2.add(1);
 
if( l1 == l2) {
    System.out.println ("l1 == l2");
} else {
    System.out.println ("l1 != l2"); // 输出
}
 
List<Integer> l3 = l1;
l3.add(2);
if( l1 == l3) {
    System.out.println ("l1 == l3"); // 输出
} else {
    System.out.println ("l1 != l3");
}
 

Clojure的atom

我们已经知道Clojure的atom只是对AtomicReference的一个包装, 但Clojure同时还有Java所不具备的persistent data structure, 例如vector, map, set. 还是之前的例子, 对一个列表, 进行修改, 如果是Java的话, 如果要使用AtomicReference, 必须将整个列表拷贝出来, 然后修改, 而在Clojure中, 任何修改总是会返回一个新的列表, 而不是简单的拷贝. 换句话说Clojure的PDS是默认immutable的.

 
Clojure atom = PDS + AtomicReference
 

下面的例子, 启动n个线程, 每个线程对atom的值加1, 首先是一些辅助函数

 
(defn tid [] (.getId (Thread/currentThread)))
 
(defn debug  [ msg]
  (print (str msg (apply str (repeat (- 35 (count msg)) \space))  " tid: " (tid)  "\n"))
  (flush))
 
(defn thread-func []
  (debug "Hello , world from tid: ")
)
 
(defn do-thread [func thread-num]
  (let [agents (map #(agent %) (range 1 thread-num))]
    (doseq [agent agents]
      (send-off agent (fn [a] (func) a) )
    )
    (doseq [agent agents]
      (await agent)
      (debug (str "wait for agent: " @agent))
    )
  )
)
 

debug打印一条语句, 末尾会带上所在的线程id, do-thread接受一个函数和一个线程数量, 这将会启动相应数量的线程, 每个线程中执行第一个参数中的函数. do-thread有很多种写法, 这里是其中一种, 利用了agent的特性.

接下来可以测试atom了

 
(def a (atom 0))
 
(defn swap-inc [old-value]
  (debug (str "after inc : a is " (+ old-value 1)))
  (+ old-value 1)
)
 
(defn atom-inc []
  (swap! a swap-inc )
)
 
(do-thread atom-inc 10)