nacos 原理之为什么修改了配置文件后应用端会立刻生效-服务端篇1

本文主要是介绍nacos 原理之为什么修改了配置文件后应用端会立刻生效-服务端篇1,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

单位用了 nacos 来保存配置,前一段时间研究了下 nacos 的原理,今天做个整理和总结。

为什么在 nacos 服务端改了配置文件之后,应用端会立刻生效?

原来我以为服务端在修改了配置文件之后会把结果推送给应用端,后来看了代码之后才发现不是这样的。

简单的说一下,在应用端有一个线程会不断的查询服务端,我感兴趣的某些文件有没有发生变化:

  1. 如果服务端的配置文件有了变化之后,会立刻告诉应用端,某些文件发生了改变。紧接着应用端会根据返回的改变了的文件信息再去 nacos 服务端查询改变了的文件的文件内容。查到内容之后再做相关的变量的改变。
  2. 如果文件没有改变,则 nacos 服务端会有一个异步任务,如果在超时时间内配置文件:
    1. 没有改变,则到点儿后调度任务会响应给应用端,告诉应用端你关心的文件没有发生变化
    2. 如果有改变的话,则会立即给应用端发送响应,告诉应用端,你关心的哪些文件发生了改变。

下面我们来上代码来详细的看一下这个过程。

应用端有一个线程在长轮询 服务端的 /v1/cs/configs/listener 这个服务

/v1/cs/configs/listener 这个服务对应着 nacos 服务端的 com.alibaba.nacos.config.server.controller.ConfigController.listener() 这个方法

1. 让我们来看看 listener 这个方法的实现:

@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);String probeModify = request.getParameter("Listening-Configs");if (StringUtils.isBlank(probeModify)) {throw new IllegalArgumentException("invalid probeModify");}probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);Map<String, String> clientMd5Map;try {clientMd5Map = MD5Util.getClientMd5Map(probeModify);}catch (Throwable e) {throw new IllegalArgumentException("invalid probeModify");}// do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}
  1. probeModify 是由分隔符拼起来的一个字符串,这个变量代表着应用关心的文件和文件的内容的 md5 集合。

    每个文件是由 dataId + groupId + md5 + tenantId(namespaceId) 确定的。dataId + 单词分隔符 + groupId + 单词分隔符 + tenantId(namespaceId)+ 行分隔符 如果应用有多个配置文件的话,则每个配置文件之间由行分隔符分隔。这个分隔与客户端的版本还有是否传了 tenantId 有关,我这儿列出的只是其中的一种。

  2. 接下来我们再看看 clientMd5Map 这个变量

    clientMd5Map 这个 map 的 key 代表了应用关心的文件,value 则代表了应用端关心的文件的内容对应的 md5 值。

  3. 再看看 inner.doPollingConfig() 这个方法调用。

/**轮询接口
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize)
throws IOException {// 长轮询
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + “”;
}// else 兼容短轮询逻辑
List changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);// 兼容短轮询result
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version =2.0.0;
}
int versionNum = Protocol.getVersionNumber(version);/**2.0.4版本以前, 返回值放入header中
*/
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute(“content”, newResult);
}
Loggers.AUTH.info(new content:+ newResult);// 禁用缓存
response.setHeader(“Pragma”, “no-cache”);
response.setDateHeader(“Expires”, 0);
response.setHeader(“Cache-Control”, “no-cache,no-store”);
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + “”;
}

只看支持长轮询的逻辑,短轮询的逻辑更简单。重点只看这一行代码的调用

    longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);

把这个长轮询客户端请求添加到长轮询服务中了。

2. 再接着看 com.alibaba.nacos.config.server.service.LongPollingService.addLongPollingClient() 这个方法

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);/*** 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance*/long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// do nothing but set fix polling timeout} else {long start = System.currentTimeMillis();List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {generateResponse(req, rsp, changedGroups);LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",clientMd5Map.size(), probeRequestSize, changedGroups.size());return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;}}String ip = RequestUtil.getRemoteIp(req);// 一定要由HTTP线程调用,否则离开后容器会立即发送响应final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制asyncContext.setTimeout(0L);scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}

看了下代码,总结了下固定轮询和非固定轮询的差异:

  1. 固定轮询只有在定时调度任务到了之后才会给出应用端响应,换句话说,如果服务器端修改了配置,固定轮询有很大可能不能立即收到文件变更的响应。
  2. 非固定轮询在询问的时候首先会查看一下配置文件是否发生了改变,如果是的话,就立即给出响应。如果不是的话,则加入定时调度任务。如果定时调度还没有开始执行,这时候服务端修改了配置文件,则会把定时调度任务取消,并且立即给出应用端响应,应用端几乎是实时收到服务端给出的文件变更响应。

默认是非固定轮询,首先查看应用端关心的配置文件是否发生了改变,即 changedGroups.size() 是否大于 0 。如果配置文件发生了改变的话,则立即给出响应,并告诉应用,哪些文件发生了改变。

如果配置文件没有改变,并且 noHangUpFlag 为 true 的时候,记录完日志之后就 return 了,由于没有启用异步调用,所以给应用响应的是空。

一般是在应用启动的时候 noHangUpFlag 才为真,应用首次加载配置文件,这个时候不能一直处于等待状态。

再接着往下看,如果配置文件没有改变并且应用端允许服务端挂起自己时,则会让线程池调度任务立即执行 ClientLongPolling 任务。

        String ip = RequestUtil.getRemoteIp(req);// 一定要由HTTP线程调用,否则离开后容器会立即发送响应final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制asyncContext.setTimeout(0L);scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));

ClientLongPolling 是 LongPollingService 的一个内部类:

package com.alibaba.nacos.config.server.service;import com.alibaba.nacos.config.server.model.SampleResult;import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;import com.alibaba.nacos.config.server.monitor.MetricsMonitor;import com.alibaba.nacos.config.server.utils.GroupKey;import com.alibaba.nacos.config.server.utils.LogUtil;import com.alibaba.nacos.config.server.utils.MD5Util;import com.alibaba.nacos.config.server.utils.RequestUtil;import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener;import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event;import org.apache.commons.lang3.StringUtils;import org.springframework.stereotype.Service;import javax.servlet.AsyncContext;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.util.*;import java.util.concurrent.*;import static com.alibaba.nacos.config.server.utils.LogUtil.memoryLog;import static com.alibaba.nacos.config.server.utils.LogUtil.pullLog;/*** 长轮询服务。负责处理** @author Nacos*/@Servicepublic class LongPollingService extends AbstractEventListener {private static final int FIXED_POLLING_INTERVAL_MS = 10000;private static final int SAMPLE_PERIOD = 100;private static final int SAMPLE_TIMES = 3;private static final String TRUE_STR = "true";private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>();private static boolean isFixedPolling() {return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);}private static int getFixedPollingInterval() {return SwitchService.getSwitchInteger(SwitchService.FIXED_POLLING_INTERVAL, FIXED_POLLING_INTERVAL_MS);}public boolean isClientLongPolling(String clientIp) {return getClientPollingRecord(clientIp) != null;}public Map<String, String> getClientSubConfigInfo(String clientIp) {ClientLongPolling record = getClientPollingRecord(clientIp);if (record == null) {return Collections.<String, String>emptyMap();}return record.clientMd5Map;}public SampleResult getSubscribleInfo(String dataId, String group, String tenant) {String groupKey = GroupKey.getKeyTenant(dataId, group, tenant);SampleResult sampleResult = new SampleResult();Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);for (ClientLongPolling clientLongPolling : allSubs) {if (clientLongPolling.clientMd5Map.containsKey(groupKey)) {lisentersGroupkeyStatus.put(clientLongPolling.ip, clientLongPolling.clientMd5Map.get(groupKey));}}sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);return sampleResult;}public SampleResult getSubscribleInfoByIp(String clientIp) {SampleResult sampleResult = new SampleResult();Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);for (ClientLongPolling clientLongPolling : allSubs) {if (clientLongPolling.ip.equals(clientIp)) {// 一个ip可能有多个监听if (!lisentersGroupkeyStatus.equals(clientLongPolling.clientMd5Map)) {lisentersGroupkeyStatus.putAll(clientLongPolling.clientMd5Map);}}}sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);return sampleResult;}/*** 聚合采样结果中的采样ip和监听配置的信息;合并策略用后面的覆盖前面的是没有问题的** @param sampleResults sample Results* @return Results*/public SampleResult mergeSampleResult(List<SampleResult> sampleResults) {SampleResult mergeResult = new SampleResult();Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);for (SampleResult sampleResult : sampleResults) {Map<String, String> lisentersGroupkeyStatusTmp = sampleResult.getLisentersGroupkeyStatus();for (Map.Entry<String, String> entry : lisentersGroupkeyStatusTmp.entrySet()) {lisentersGroupkeyStatus.put(entry.getKey(), entry.getValue());}}mergeResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);return mergeResult;}public Map<String, Set<String>> collectApplicationSubscribeConfigInfos() {if (allSubs == null || allSubs.isEmpty()) {return null;}HashMap<String, Set<String>> app2Groupkeys = new HashMap<String, Set<String>>(50);for (ClientLongPolling clientLongPolling : allSubs) {if (StringUtils.isEmpty(clientLongPolling.appName) || "unknown".equalsIgnoreCase(clientLongPolling.appName)) {continue;}Set<String> appSubscribeConfigs = app2Groupkeys.get(clientLongPolling.appName);Set<String> clientSubscribeConfigs = clientLongPolling.clientMd5Map.keySet();if (appSubscribeConfigs == null) {appSubscribeConfigs = new HashSet<String>(clientSubscribeConfigs.size());}appSubscribeConfigs.addAll(clientSubscribeConfigs);app2Groupkeys.put(clientLongPolling.appName, appSubscribeConfigs);}return app2Groupkeys;}public SampleResult getCollectSubscribleInfo(String dataId, String group, String tenant) {List<SampleResult> sampleResultLst = new ArrayList<SampleResult>(50);for (int i = 0; i < SAMPLE_TIMES; i++) {SampleResult sampleTmp = getSubscribleInfo(dataId, group, tenant);if (sampleTmp != null) {sampleResultLst.add(sampleTmp);}if (i < SAMPLE_TIMES - 1) {try {Thread.sleep(SAMPLE_PERIOD);} catch (InterruptedException e) {LogUtil.clientLog.error("sleep wrong", e);}}}SampleResult sampleResult = mergeSampleResult(sampleResultLst);return sampleResult;}public SampleResult getCollectSubscribleInfoByIp(String ip) {SampleResult sampleResult = new SampleResult();sampleResult.setLisentersGroupkeyStatus(new HashMap<String, String>(50));for (int i = 0; i < SAMPLE_TIMES; i++) {SampleResult sampleTmp = getSubscribleInfoByIp(ip);if (sampleTmp != null) {if (sampleTmp.getLisentersGroupkeyStatus() != null&& !sampleResult.getLisentersGroupkeyStatus().equals(sampleTmp.getLisentersGroupkeyStatus())) {sampleResult.getLisentersGroupkeyStatus().putAll(sampleTmp.getLisentersGroupkeyStatus());}}if (i < SAMPLE_TIMES - 1) {try {Thread.sleep(SAMPLE_PERIOD);} catch (InterruptedException e) {LogUtil.clientLog.error("sleep wrong", e);}}}return sampleResult;}private ClientLongPolling getClientPollingRecord(String clientIp) {if (allSubs == null) {return null;}for (ClientLongPolling clientLongPolling : allSubs) {HttpServletRequest request = (HttpServletRequest) clientLongPolling.asyncContext.getRequest();if (clientIp.equals(RequestUtil.getRemoteIp(request))) {return clientLongPolling;}}return null;}public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);/*** 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance*/long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// do nothing but set fix polling timeout} else {long start = System.currentTimeMillis();List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {generateResponse(req, rsp, changedGroups);LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",clientMd5Map.size(), probeRequestSize, changedGroups.size());return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;}}String ip = RequestUtil.getRemoteIp(req);// 一定要由HTTP线程调用,否则离开后容器会立即发送响应final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制asyncContext.setTimeout(0L);scheduler.execute(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}@Overridepublic List<Class<? extends Event>> interest() {List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>();eventTypes.add(LocalDataChangeEvent.class);return eventTypes;}@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// ignore} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent)event;scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}static public boolean isSupportLongPolling(HttpServletRequest req) {return null != req.getHeader(LONG_POLLING_HEADER);}@SuppressWarnings("PMD.ThreadPoolCreationRule")public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);t.setName("com.alibaba.nacos.LongPolling");return t;}});scheduler.scheduleWithFixedDelay(new StatTask(), 0L, 10L, TimeUnit.SECONDS);}// =================static public final String LONG_POLLING_HEADER = "Long-Pulling-Timeout";static public final String LONG_POLLING_NO_HANG_UP_HEADER = "Long-Pulling-Timeout-No-Hangup";final ScheduledExecutorService scheduler;/*** 长轮询订阅关系*/final Queue<ClientLongPolling> allSubs;// =================class DataChangeTask implements Runnable {@Overridepublic void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();if (clientSub.clientMd5Map.containsKey(groupKey)) {// 如果beta发布且不在beta列表直接跳过if (isBeta && !betaIps.contains(clientSub.ip)) {continue;}// 如果tag发布且不在tag列表直接跳过if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {continue;}getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // 删除订阅关系LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - changeTime),"in-advance",RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),"polling",clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());}}DataChangeTask(String groupKey) {this(groupKey, false, null);}DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {this(groupKey, isBeta, betaIps, null);}DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps, String tag) {this.groupKey = groupKey;this.isBeta = isBeta;this.betaIps = betaIps;this.tag = tag;}final String groupKey;final long changeTime = System.currentTimeMillis();final boolean isBeta;final List<String> betaIps;final String tag;}// =================class StatTask implements Runnable {@Overridepublic void run() {memoryLog.info("[long-pulling] client count " + allSubs.size());MetricsMonitor.getLongPollingMonitor().set(allSubs.size());}}// =================class ClientLongPolling implements Runnable {@Overridepublic void run() {asyncTimeoutFuture = scheduler.schedule(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());/*** 删除订阅关系*/allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest)asyncContext.getRequest(),(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);if (changedGroups.size() > 0) {sendResponse(changedGroups);} else {sendResponse(null);}} else {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);sendResponse(null);}} catch (Throwable t) {LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);allSubs.add(this);}void sendResponse(List<String> changedGroups) {/***  取消超时任务*/if (null != asyncTimeoutFuture) {asyncTimeoutFuture.cancel(false);}generateResponse(changedGroups);}void generateResponse(List<String> changedGroups) {if (null == changedGroups) {/*** 告诉容器发送HTTP响应*/asyncContext.complete();return;}HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse();try {String respString = MD5Util.compareMd5ResultString(changedGroups);// 禁用缓存response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);asyncContext.complete();} catch (Exception se) {pullLog.error(se.toString(), se);asyncContext.complete();}}ClientLongPolling(AsyncContext ac, Map<String, String> clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {this.asyncContext = ac;this.clientMd5Map = clientMd5Map;this.probeRequestSize = probeRequestSize;this.createTime = System.currentTimeMillis();this.ip = ip;this.timeoutTime = timeoutTime;this.appName = appName;this.tag = tag;}// =================final AsyncContext asyncContext;final Map<String, String> clientMd5Map;final long createTime;final String ip;final String appName;final String tag;final int probeRequestSize;final long timeoutTime;Future<?> asyncTimeoutFuture;}void generateResponse(HttpServletRequest request, HttpServletResponse response, List<String> changedGroups) {if (null == changedGroups) {return;}try {String respString = MD5Util.compareMd5ResultString(changedGroups);// 禁用缓存response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);} catch (Exception se) {pullLog.error(se.toString(), se);}}public Map<String, Long> getRetainIps() {return retainIps;}public void setRetainIps(Map<String, Long> retainIps) {this.retainIps = retainIps;}}

代码挺长的,在这里我们先看 ClientLongPolling 的 run() 方法。

    @Overridepublic void run() {asyncTimeoutFuture = scheduler.schedule(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());/*** 删除订阅关系*/allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest)asyncContext.getRequest(),(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);if (changedGroups.size() > 0) {sendResponse(changedGroups);} else {sendResponse(null);}} else {LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",(System.currentTimeMillis() - createTime),"timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),"polling",clientMd5Map.size(), probeRequestSize);sendResponse(null);}} catch (Throwable t) {LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);allSubs.add(this);}

在 run() 方法里做了两件事:

  1. 第一件事就是用线程池调度任务又调度了一个任务。
  2. 第二件事就是把 ClientLongPolling 加入到 allSubs 客户端订阅集合中。

先看第一件事,过 timeoutTime 时间后会执行这个任务,再看这个任务干了些啥,挑重点的看一下:

   allSubs.remove(ClientLongPolling.this);

删除订阅关系,为什么要删除呢?因为客户端会长轮询,并不是服务端推送给客户端,如果不删除的话,客户端每轮询一次就往进去添加一次, allSubs 会越来越大,最终结果就是把内存撑爆了。

如果是固定轮询的话,则检查应用端关心的文件是否发生了改变,如果改变了的话则把改变了的文件 key 信息响应给应用,如果没改变的话,则响应数据为空。

因为服务端在修改配置文件的时候不会通知固定轮询,所以固定轮询只能在定时调度任务执行的时候去检查是否文件发生了变更,检查的间隔时间是固定的,因此叫固定轮询。

如果是非固定轮询的话,则给应用一个空的响应。

为什么非固定轮询给一个空的响应呢?因为非固定轮询在询问之初就会检查一下配置文件是否发生了改变,如果发生了改变了的话就会立即给出响应。还有就是应用处于等待状态,在这个等待过程中如果服务端修改了配置文件则会取消定时调度任务,然后立即给出响应。所以在定时调度任务中给出空的响应,文件发生了变更会在别的地方给出响应的。

再接着看看第二件事:

 allSubs.add(this);

为什么要把 this(ClientLongPolling) 加入到 allSubs 中呢?这儿埋下了伏笔,为以后服务端修改了配置文件后取消定时任务/立即给出应用端响应埋下了伏笔。

画了一个简单的流程图,省略了一些内容,保留了主干流程来帮助大家理解。

在这里插入图片描述

涉及到的内容不少,还有一大片代码,剩下的部分会在下一篇中讲述。服务端在修改配置文件的时候做了哪些事。

这篇关于nacos 原理之为什么修改了配置文件后应用端会立刻生效-服务端篇1的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/799277

相关文章

Python标准库之数据压缩和存档的应用详解

《Python标准库之数据压缩和存档的应用详解》在数据处理与存储领域,压缩和存档是提升效率的关键技术,Python标准库提供了一套完整的工具链,下面小编就来和大家简单介绍一下吧... 目录一、核心模块架构与设计哲学二、关键模块深度解析1.tarfile:专业级归档工具2.zipfile:跨平台归档首选3.

使用IDEA部署Docker应用指南分享

《使用IDEA部署Docker应用指南分享》本文介绍了使用IDEA部署Docker应用的四步流程:创建Dockerfile、配置IDEADocker连接、设置运行调试环境、构建运行镜像,并强调需准备本... 目录一、创建 dockerfile 配置文件二、配置 IDEA 的 Docker 连接三、配置 Do

深入浅出SpringBoot WebSocket构建实时应用全面指南

《深入浅出SpringBootWebSocket构建实时应用全面指南》WebSocket是一种在单个TCP连接上进行全双工通信的协议,这篇文章主要为大家详细介绍了SpringBoot如何集成WebS... 目录前言为什么需要 WebSocketWebSocket 是什么Spring Boot 如何简化 We

Java Stream流之GroupBy的用法及应用场景

《JavaStream流之GroupBy的用法及应用场景》本教程将详细介绍如何在Java中使用Stream流的groupby方法,包括基本用法和一些常见的实际应用场景,感兴趣的朋友一起看看吧... 目录Java Stream流之GroupBy的用法1. 前言2. 基础概念什么是 GroupBy?Stream

python中列表应用和扩展性实用详解

《python中列表应用和扩展性实用详解》文章介绍了Python列表的核心特性:有序数据集合,用[]定义,元素类型可不同,支持迭代、循环、切片,可执行增删改查、排序、推导式及嵌套操作,是常用的数据处理... 目录1、列表定义2、格式3、列表是可迭代对象4、列表的常见操作总结1、列表定义是处理一组有序项目的

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、

C#中的Converter的具体应用

《C#中的Converter的具体应用》C#中的Converter提供了一种灵活的类型转换机制,本文详细介绍了Converter的基本概念、使用场景,具有一定的参考价值,感兴趣的可以了解一下... 目录Converter的基本概念1. Converter委托2. 使用场景布尔型转换示例示例1:简单的字符串到

Spring Boot Actuator应用监控与管理的详细步骤

《SpringBootActuator应用监控与管理的详细步骤》SpringBootActuator是SpringBoot的监控工具,提供健康检查、性能指标、日志管理等核心功能,支持自定义和扩展端... 目录一、 Spring Boot Actuator 概述二、 集成 Spring Boot Actuat

PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例

《PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例》词嵌入解决NLP维度灾难,捕捉语义关系,PyTorch的nn.Embedding模块提供灵活实现,支持参数配置、预训练及变长... 目录一、词嵌入(Word Embedding)简介为什么需要词嵌入?二、PyTorch中的nn.Em

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分