ConcurrentHashMap扩容原理 | 存储流程 | 源码探究

2024-08-29 11:28

本文主要是介绍ConcurrentHashMap扩容原理 | 存储流程 | 源码探究,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

新人写手,代码菜鸡;笔下生涩,诚惶诚恐。

初试锋芒,尚显青涩;望君指点,愿受教诲。 

本篇文章将从源码的层面,探讨ConcurrentHashMap的存储流程以及扩容原理

Java版本为JDK17,源代码可能与其他版本略有不同

推荐阅读:HashMap实现原理、扩容机制

 一、构造函数

1.1 无参构造函数

ConcurrentHashMap的无参构造函数是一个空方法

public ConcurrentHashMap() {
}

1.2 指定容量、负载因子

多了一个并发级别concurrencyLevel,没看出来大用途emm......

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel)   // Use at least as many binsinitialCapacity = concurrencyLevel;   // as estimated threadslong size = (long)(1.0 + (long)initialCapacity / loadFactor);// 找到大于等于 容量 / 负载因子 + 1 的2的幂int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;
}

1.3 传入Map

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {this.sizeCtl = DEFAULT_CAPACITY;putAll(m);
}

二、关键成员变量

扩容相关的变量都用到了volatile关键字作修饰,保证变量并发可见性

// 默认大小,树化阈值等与HashMap相同
// ...
// 存储元素的Node数组
transient volatile Node<K,V>[] table;
// 协助扩容会用到
private transient volatile Node<K,V>[] nextTable;
// 大于0时,为扩容阈值,等于0时,为初始状态,小于0时,表示Map处于扩容中等一些特殊状态
// 扩容时,该值的低十六位用于表示共同扩容线程数
private transient volatile int sizeCtl;
// 扩容下标指针,用于记录数组扩容到了哪一个元素,从数组末尾开始
private transient volatile int transferIndex;
// 使用Unsafe高效读取和修改变量值
private static final Unsafe U = Unsafe.getUnsafe();
private static final long SIZECTL= U.objectFieldOffset(test.ConcurrentHashMap.class, "sizeCtl");
private static final long TRANSFERINDEX= U.objectFieldOffset(test.ConcurrentHashMap.class, "transferIndex");
// 扩容标志位数
private static final int RESIZE_STAMP_BITS = 16;
// 最大允许扩容线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 扩容标志位左移位数,扩容时用于将sizeCtl置为负数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 表示节点的状态,一般扩容时会将正在扩容的节点hash值修改为以下值
static final int MOVED     = -1; // hash for forwarding nodes
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
// 系统可用线程数,用于计算扩容步长
static final int NCPU = Runtime.getRuntime().availableProcessors();

三、存储流程

3.1 put流程

put方法会调用putVal方法,业务逻辑都在putVal中

public V put(K key, V value) {return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {// key 和 value都不能为nullif (key == null || value == null) throw new NullPointerException();// key 的 hash 值int hash = spread(key.hashCode());int binCount = 0;// 并发场景基本都用到了for循环,执行插入时,若map正在扩容导致插入失败// 待扩容完成后可在此尝试插入for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh; K fk; V fv;// table为null或长度为0,执行初始化if (tab == null || (n = tab.length) == 0)tab = initTable();// 如果当前table数组i位置没有元素,直接以cas方式插入当前待插入节点else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))break;                   // no lock when adding to empty bin}// 如果当前当前节点元素hash值为-1,表示正在扩容(扩容时,扩容节点hash值会置为-1)else if ((fh = f.hash) == MOVED)// 则辅助进行扩容,具体后面说tab = helpTransfer(tab, f);// 走到这里,说明table[i]位置节点存在,并且数据不在扩容// 和HashMap类似,onlyIfAbsent为true时,// 仅当节点不存在时,才会插入;// 如果存在,不会更新节点值// 同样的,判断节点相同,需要hash值和key值都一致else if (onlyIfAbsent // check first node without acquiring lock&& fh == hash&& ((fk = f.key) == key || (fk != null && key.equals(fk)))&& (fv = f.val) != null)return fv;else {V oldVal = null;// 锁住当前节点synchronized (f) {// 再次判断当前节点是否发生了变化,防止被其他线程提前修改if (tabAt(tab, i) == f) {// fh < 0时,代表三个特殊状态// static final int MOVED     = -1; 当前节点正在扩容// static final int TREEBIN   = -2; 当前节点是一颗树// static final int RESERVED  = -3; 保留当前位置// 这里fh >= 0表示当前节点f为单节点(不是树或链表),或者为链表if (fh >= 0) {// 链表长度binCount = 1;// 开始寻找待插入节点在链表中的位置// 找不到则直接挂载到链表尾部for (Node<K,V> e = f;; ++binCount) {K ek;// 找到了,直接更新数据(当然onlyIfAbsent为false)if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// 如果链表最后都没有找到,新建一个Node挂到后面if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key, value);break;}}}// 如果为树,则遍历树,更新或插入节点else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}// 防止递归更新产生死锁else if (f instanceof ReservationNode)throw new IllegalStateException("Recursive update");}}if (binCount != 0) {// 如果链表长度大于等于8,尝试将链表转化为树// 只有table容量大于等于64才会尝试转化,否则优先扩容if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 计数+1,并检查是否需要扩容(是否达到扩容阈值)addCount(1L, binCount);return null;
}
  • 由于需要保证线程安全,所以并不是每一个插入操作都是一次成功(如插入时Map正在扩容),所以用到了for循环;
  • 插入元素时,用到了Synchronized锁,锁粒度具体到Node[]数组中的某个Node节点;
  • 加锁后又判断了一次if (tabAt(tab, i) == f),类似双重校验锁,ConcurrentHashMap大量使用到了这种处理方式。

3.2 get流程

public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 获取hash值int h = spread(key.hashCode());// 确保能查到元素,否则直接返回nullif ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {// hash和key都相同,返回值if ((eh = e.hash) == h) {if ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// 节点hash值小于0,表示可能处于扩容状态else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 遍历链表,继续查找key相同的节点,返回值while ((e = e.next) != null) {if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;
}// 类似,遍历链表查找key
Node<K,V> find(int h, Object k) {Node<K,V> e = this;if (k != null) {do {K ek;if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;} while ((e = e.next) != null);}return null;
}

四、扩容原理

在说传参为Map的构造函数的时候,见到过这样的方法

public void putAll(Map<? extends K, ? extends V> m) {tryPresize(m.size());for (Map.Entry<? extends K, ? extends V> e : m.entrySet())putVal(e.getKey(), e.getValue(), false);
}

接下来就从tryPresize方法入手

4.1 tryPresize 尝试扩容

private final void tryPresize(int size) {// 如果size >= max / 2,扩容后容量就用max// 否则使用1.5 * size + 1向上扩为2的幂次方int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;// sizeCtl >= 0表示不处于扩容等异常状态while ((sc = sizeCtl) >= 0) {Node<K,V>[] tab = table; int n;// putAll方法可能会执行此分支,table为空if (tab == null || (n = tab.length) == 0) {// 初始化长度sc够就用sc,否则用前面算出来的cn = (sc > c) ? sc : c;// cas把当前map状态置为扩容中,确保只有一个线程进入if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {try {// 确保table引用没被修改// 创建新数组// 将sizeCtl赋值为容量的0.75倍if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}}}// 数组容量已经大于所需要的容量,或者容量达到最大值// 无需扩容,直接退出循环else if (c <= sc || n >= MAXIMUM_CAPACITY)break;else if (tab == table) {// 获取扩容标志// 假设n = 16// 此时rs为1000 0000 0001 1011int rs = resizeStamp(n);// 扩容标志会使得低到高第16位为1,左移16位,保证sc为负数// 此时sc修改为1000 0000 0001 1011 0000 0000 0000 0010// 即-2145714174if (U.compareAndSetInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}
}
  • if (U.compareAndSetInt(this, SIZECTL, sc, -1))保证只有一个线程执行new Node[]操作
  • 扩容标志,用在协助扩容的时候,识别两个线程处于同一个扩容任务中

4.2 transfer 扩容方法

计算步长与初始化

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 每个线程迁移数据的步长,数组长度 / 8 / cpu可用线程数// 最小为16if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 初始化走这if (nextTab == null) {            // initiatingtry {@SuppressWarnings("unchecked")// 新建数组,容量为之前的两倍,并赋值Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// 赋值给volatile变量// private transient volatile Node<K,V>[] nextTable;nextTable = nextTab;// private transient volatile int transferIndex;transferIndex = n;}// ...... 
}

线程接受扩容任务

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// ......// 新数组长度int nextn = nextTab.length;// 扩容节点,挂载的是新数组ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 任务标识符,为true时表示需要接受扩容任务boolean advance = true;// 结束标识符,为true时表示扩容结束boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 需要接受任务时while (advance) {int nextIndex, nextBound;// 第三个分支i被赋值后或扩容结束走此分支// 表示当前线程已接受任务if (--i >= bound || finishing)advance = false;// 数组所有节点扩容任务均分配完成else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 修改扩容下标,假设需要扩容的数组长度为32,步长16// 则每次循环transferIndex会减16,直到减为0// nextBound记录当前循环到的transferIndex下标// i赋值为nextIndex-1,(bound - i)即为当前线程需要数据迁移的数组下标// 接受过任务,将advance置为false,不再进入下一循环else if (U.compareAndSetInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}}
}

 bound -> i即为当前线程需要进行数据迁移的数组下标。循环结束时,若此时没有其他线程协助扩容,当前线程会再次接受新的一批迁移任务

扩容后处理 

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// ......// ......for (int i = 0, bound = 0;;) {// ......// 当前线程没有分配到任务时会满足i < 0条件if (i < 0 || i >= n || i + n >= nextn) {int sc;// 扩容结束,nextTable置为null// nextTab赋值给table// 阈值置为新数组容量的0.75倍// 第一次进入为false,之后finishing被赋值为true,执行finishing逻辑if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}// cas使sizeCtl = sizeCtl - 1if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 第一个线程进入此,不会走下面的if// 进入transfer方法时,满足(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT// 第二次进入,sc = sizeCtl比原来的值少1,可以进入if条件,执行return;// 这里的判断,是为了检查所有辅助扩容线程是否均已扩容完毕// 每个协助扩容线程都会将sc + 1if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 扩容结束finishing = advance = true;// i重新赋值为数组长度,让最后一个线程检查数组是否迁移完毕,并赋值sc为新的扩容阈值i = n; // recheck before commit}}// ......}
}

数据迁移

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {// ......// ......for (int i = 0, bound = 0;;) {// ......// ......// 走到之后的分支意味着线程接收到了任务// 如果数组i位置为null,不需要迁移,并将数组i位置hash置为MOVED,代表已经迁移过else if ((f = tabAt(tab, i)) == null)/*ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null);this.nextTable = tab;}fwd hash为-1,key、value均为null*/advance = casTabAt(tab, i, null, fwd);// hash为-1,说明该节点已经迁移过else if ((fh = f.hash) == MOVED)advance = true; // already processed// 进入节点迁移逻辑else {// 上锁,锁的是node数组f节点synchronized (f) {// 校验引用if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// 单节点或链表部分if (fh >= 0) {// 和HashMap类似,将节点hash与n进行按位与分为两组// 拆成低位链表和高位链表// 低位链表放在新数组i位置,高位链表则为n + i位置int runBit = fh & n;Node<K,V> lastRun = f;// 这里的循环是为了找到链表最后面的节点族的第一个节点// 即保证lastRun节点及其之后的节点同属于高/低位链表// 如此设计,最后遍历链表遍历到lastRun即可,因为lastRun之后的节点都和lastRun同属于一批for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}// 先将最后的节点族赋值if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}// 再次遍历f节点链表,开始构建高/低位链表for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;// 除了lastRun及其之后的链表是尾插法,其余遍历的链表采用头插法if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 新数组赋值,并将旧书组i位置置为fwd,标记已经迁移setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}// 树节点同样分为高低位数else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 拆分成两棵树后,判断长度是否小于6,决定是否需要红黑数链表化ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}// 递归更新抛出异常else if (f instanceof ReservationNode)throw new IllegalStateException("Recursive update");}}}}
}

这里的链表迁移与HashMap略有不同:

HashMap的高低位链表,相对顺序与原顺序相同;

而ConcurrentHashMap,lastRun及其之后的节点顺序保持一致,在链表尾部。其他的节点会以头插法的方式加入到新数组中,如下图,橙色1 4 5为低位,2 3 6 7为高位

4.3 helpTransfer 协助扩容

putVal方法中,当前节点hash为-1时,线程会协助进行扩容

// 如果当前当前节点元素hash值为-1,表示正在扩容(扩容时,扩容节点hash值会置为-1)
else if ((fh = f.hash) == MOVED)// 则辅助进行扩容,具体后面说tab = helpTransfer(tab, f);
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {Node<K,V>[] nextTab; int sc;// 旧数组不为空,并且当前f节点为ForwardingNode// 并且f.nextTable也不为空,transfer方法中,nextTable为新数组if (tab != null && (f instanceof ForwardingNode) &&(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {// 扩容戳int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT;// 新老数组都是同一个,并且sizeCtl处于扩容状态,协助进行扩容// 不相同可能是扩容完成了,或者发生了新的扩容while (nextTab == nextTable && table == tab &&(sc = sizeCtl) < 0) {// 扩容时,sizeCtl低16位代表扩容的线程数,首次进入时被设置为了2// 当此时同时扩容数达到上限,或者开始执行最终扩容检查时(最终检查时,sizeCtl == rs + 1)// 或者扩容下标为0(任务全部分配)时,退出if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||transferIndex <= 0)break;// sizeCtl + 1,表示此时扩容线程+1if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {// 协助扩容transfer(tab, nextTab);break;}}return nextTab;}return table;
}

transfer方法中,将nextTab赋值给了volatile变量nextTable,旧数组长度n赋值给了transferIndex,此处用来判断是否是同一个扩容任务以及快速判断扩容是否完成。

结语:相对于HashMap,ConcurrentHashMap源码稍微绕一点。本文起到辅助理解作用,其他方法(remove等)原理类似,感兴趣可自行阅读源码。

这篇关于ConcurrentHashMap扩容原理 | 存储流程 | 源码探究的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1117728

相关文章

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

Java Spring 中 @PostConstruct 注解使用原理及常见场景

《JavaSpring中@PostConstruct注解使用原理及常见场景》在JavaSpring中,@PostConstruct注解是一个非常实用的功能,它允许开发者在Spring容器完全初... 目录一、@PostConstruct 注解概述二、@PostConstruct 注解的基本使用2.1 基本代

Golang HashMap实现原理解析

《GolangHashMap实现原理解析》HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持高效的插入、查找和删除操作,:本文主要介绍GolangH... 目录HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持

关于MongoDB图片URL存储异常问题以及解决

《关于MongoDB图片URL存储异常问题以及解决》:本文主要介绍关于MongoDB图片URL存储异常问题以及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录MongoDB图片URL存储异常问题项目场景问题描述原因分析解决方案预防措施js总结MongoDB图

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三

C#中async await异步关键字用法和异步的底层原理全解析

《C#中asyncawait异步关键字用法和异步的底层原理全解析》:本文主要介绍C#中asyncawait异步关键字用法和异步的底层原理全解析,本文给大家介绍的非常详细,对大家的学习或工作具有一... 目录C#异步编程一、异步编程基础二、异步方法的工作原理三、代码示例四、编译后的底层实现五、总结C#异步编程

Go 语言中的select语句详解及工作原理

《Go语言中的select语句详解及工作原理》在Go语言中,select语句是用于处理多个通道(channel)操作的一种控制结构,它类似于switch语句,本文给大家介绍Go语言中的select语... 目录Go 语言中的 select 是做什么的基本功能语法工作原理示例示例 1:监听多个通道示例 2:带

鸿蒙中@State的原理使用详解(HarmonyOS 5)

《鸿蒙中@State的原理使用详解(HarmonyOS5)》@State是HarmonyOSArkTS框架中用于管理组件状态的核心装饰器,其核心作用是实现数据驱动UI的响应式编程模式,本文给大家介绍... 目录一、@State在鸿蒙中是做什么的?二、@Spythontate的基本原理1. 依赖关系的收集2.