序言:

对于阿里开源分布式事务框架seata的详细了解可以参考官网,这里不会详细介绍。本章只会介绍seata中AT模式的源码分析(对阿seata有一定了解或者成功完成过demo)。

seata中一个事务的开启是由TM角色来完成,在整体事务发起方我们可以通过在执行方法中包含@GlobalTransactional来标示启用全局事务,并包含该全局事务的一定自定义设置。如下所示:

public @interface GlobalTransactional {/*** 设置该全局事务下执行的超时时间 默认毫秒** @return timeoutMills in MILLISECONDS.*/int timeoutMills() default TransactionInfo.DEFAULT_TIME_OUT;/*** 全局事务实例的的名称** @return Given name.*/String name() default "";/*** 设置哪些异常类发生需要进行rollback* @return*/Class<? extends Throwable>[] rollbackFor() default {};/***  设置哪些异常类名发生需要进行rollback* @return*/String[] rollbackForClassName() default {};/*** 设置哪些异常类发生不需要进行rollback* @return*/Class<? extends Throwable>[] noRollbackFor() default {};/*** 设置哪些异常类名发生不需要进行rollback* @return*/String[] noRollbackForClassName() default {};/*** 事务传播级别 默认REQUIRED* * @return*/Propagation propagation() default Propagation.REQUIRED;
}

1:seata客户端-TM(基于springcloud项目分析)

1.0:GlobalTransactionScanner

使用过spring的@Transactional事务的实现知道,它通过动态代理的方式,将事务的创建,提交或回滚这些公干的动作封装到一套执行模版中。这种方式在很多开源框架都是如此构建的例如mybtis中的各执行注解(@Update,@Select等),Springcloud中的Feign调用啊等。通过idea我们可以查看@GlobalTransactional该注解在什么地方被使用到,如下所示:

如果对于springboot的一些开源start(例如mybatis中MapperScannerRegistrar等)项目有过源码走读的经验,从GlobalTransactionScanner名字可以看出该类负责扫描GlobalTransaction注解并构建其代理方法(比较@GlobalTransactionScanner作用在方法中)。继续通过ieda的Find Usages功能寻找GobalTransactionScanner的引用,发现在seata的spring starter项目中的SeataAutoConfiguration对其进行初始化。

我们镜头回到GobalTransactionScanner中(对于SeataAutoConfiguration的其它作用后续描述)。GobalTransactionScanner实现了InitializingBean(bean初始化完成后执行),AbstractAutoProxyCreator,ApplicationContextAware(获取ApplicationContext对象),DisposableBean(bean被消耗时执行),分别对应的spring中bean不同的生命周期。如下所示是spring bean初始化完成后执行

 这里会存在一个疑问,为何要开启RM与TM两个client,如果对于某一个服务它在分布式事务链路中只是作为一个分支即RM的角色而非TM,那么对于这TM的启动是否没有存在必要,毕竟需要开启TM与TC之间的连接通道,也是一个资源的浪费。

1.1:客户端TM client

在1.2与1.3对于TM与TC之间连接的有关的管理类有着不同的命名

1.2的时候命名为TmRpcClient

对于1.3的时候改命名为TmNettyRemotingClient如下所示:

 其实不论上述两个版本核心都是通过Netty作为服务之间远程网络通信基础架构,所以1.3的改为TmNettyRemotingClient更简单表达底层实现原理。后续都以1.3最新版作为讲解

1.1.1:TmNettyRemotingClient(核心类,TM远程调用client)

public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {private static final Logger LOGGER = LoggerFactory.getLogger(TmNettyRemotingClient.class);private static volatile TmNettyRemotingClient instance;//长链接 keep-alive时间 private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;//常量 线程 等待队列长度private static final int MAX_QUEUE_SIZE = 2000;//是否初始化标示private final AtomicBoolean initialized = new AtomicBoolean(false);//配置applicationId唯一idprivate String applicationId;private String transactionServiceGroup;@Overridepublic void init() {// 注册返回response 消息处理器registerProcessor();//初始化if (initialized.compareAndSet(false, true)) {super.init();}}private TmNettyRemotingClient(NettyClientConfig nettyClientConfig,EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor) {super(nettyClientConfig, eventExecutorGroup, messageExecutor, NettyPoolKey.TransactionRole.TMROLE);}/*** 获取一个TmNettyRemotingClient** @param applicationId           the application id* @param transactionServiceGroup the transaction service group* @return the instance*/public static TmNettyRemotingClient getInstance(String applicationId, String transactionServiceGroup) {//作为一个单列的形式获取TmNettyRemotingClientTmNettyRemotingClient tmNettyRemotingClient = getInstance();tmNettyRemotingClient.setApplicationId(applicationId);tmNettyRemotingClient.setTransactionServiceGroup(transactionServiceGroup);return tmNettyRemotingClient;}/*** 单例获取 懒汉式获取* @return the instance*/public static TmNettyRemotingClient getInstance() {if (instance == null) {synchronized (TmNettyRemotingClient.class) {if (instance == null) {NettyClientConfig nettyClientConfig = new NettyClientConfig();//定义线程poolfinal ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),nettyClientConfig.getClientWorkerThreads()),RejectedPolicies.runsOldestTaskPolicy());instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);}}}return instance;}/*** Sets application id.** @param applicationId the application id*/public void setApplicationId(String applicationId) {this.applicationId = applicationId;}/*** Sets transaction service group.** @param transactionServiceGroup the transaction service group*/public void setTransactionServiceGroup(String transactionServiceGroup) {this.transactionServiceGroup = transactionServiceGroup;}@Overridepublic String getTransactionServiceGroup() {return transactionServiceGroup;}/***注册成功回调*/@Overridepublic void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,AbstractMessage requestMessage) {RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;if (LOGGER.isInfoEnabled()) {LOGGER.info("register TM success. client version:{}, server version:{},channel:{}", registerTMRequest.getVersion(), registerTMResponse.getVersion(), channel);}getClientChannelManager().registerChannel(serverAddress, channel);}@Overridepublic void onRegisterMsgFail(String serverAddress, Channel channel, Object response,AbstractMessage requestMessage) {RegisterTMRequest registerTMRequest = (RegisterTMRequest)requestMessage;RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;String errMsg = String.format("register TM failed. client version: %s,server version: %s, errorMsg: %s, " + "channel: %s", registerTMRequest.getVersion(), registerTMResponse.getVersion(), registerTMResponse.getMsg(), channel);throw new FrameworkException(errMsg);}/*** bean被销毁*/@Overridepublic void destroy() {super.destroy();initialized.getAndSet(false);instance = null;}@Overrideprotected Function<String, NettyPoolKey> getPoolKeyFunction() {return (severAddress) -> {RegisterTMRequest message = new RegisterTMRequest(applicationId, transactionServiceGroup);return new NettyPoolKey(NettyPoolKey.TransactionRole.TMROLE, severAddress, message);};}/*** 注册 TC response 有关处理器*/private void registerProcessor() {//注册 TC response netty返回信息解析器ClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);// 2.注册 heartbeat netty返回信息解析器ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}
}//父类中init
public void init() {//定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为trueif (NettyClientConfig.isEnableClientBatchSendRequest()) {//定义线程poolmergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));//运行MergedSendRunnable任务即合并发送请求mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//clientBootstrap.start();}

 NettyClientBootstrap是对于NettyClient的封装,对该类进行源码分析:

 

public class NettyClientBootstrap implements RemotingBootstrap {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);//client有关配置private final NettyClientConfig nettyClientConfig;//netty 启动类private final Bootstrap bootstrap = new Bootstrap();//netty workerprivate final EventLoopGroup eventLoopGroupWorker;//事件调度private EventExecutorGroup defaultEventExecutorGroup;//是否初始化标示private final AtomicBoolean initialized = new AtomicBoolean(false);private static final String THREAD_PREFIX_SPLIT_CHAR = "_";private final NettyPoolKey.TransactionRole transactionRole;//netty handler事件private ChannelHandler[] channelHandlers;public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,NettyPoolKey.TransactionRole transactionRole) {if (nettyClientConfig == null) {nettyClientConfig = new NettyClientConfig();if (LOGGER.isInfoEnabled()) {LOGGER.info("use default netty client config.");}}this.nettyClientConfig = nettyClientConfig;int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();this.transactionRole = transactionRole;//nio event groupthis.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),selectorThreadSizeThreadSize));this.defaultEventExecutorGroup = eventExecutorGroup;}/*** Sets channel handlers.** @param handlers the handlers*/protected void setChannelHandlers(final ChannelHandler... handlers) {if (handlers != null) {channelHandlers = handlers;}}/*** Add channel pipeline last.** @param channel  the channel* @param handlers the handlers*/private void addChannelPipelineLast(Channel channel, ChannelHandler... handlers) {if (channel != null && handlers != null) {channel.pipeline().addLast(handlers);}}@Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),nettyClientConfig.getClientWorkerThreads()));}//初始化 netty client 并设置option属性this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF,nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}//通过pipeline 绑定默认Handler 以及自定义handlerbootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(/*** 设置channel 空闲状态处理器,是用来检测当前Handler的ChannelRead()的空闲时间* int readerIdleTimeSeconds 为读超时时间(即多长时间没有接受到客户端发送数据)* int writerIdleTimeSeconds, 为写超时时间(即多长时间没有向客户端发送数据)* int allIdleTimeSeconds 所有类型(读或写)的超时时间* 根据个参数IdleStateHandler会启动不同的定时任务,根据设定的时长去检测ChannelRead()方法是否被调用,* 如果没有被调用。之后则会调用后续handler的userEventTriggered方法去执行一些事情(比如断开链接)*/new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds()))//设置编码解码器.addLast(new ProtocolV1Decoder()).addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}@Overridepublic void shutdown() {try {//关闭netty网络资源this.eventLoopGroupWorker.shutdownGracefully();if (this.defaultEventExecutorGroup != null) {this.defaultEventExecutorGroup.shutdownGracefully();}} catch (Exception exx) {LOGGER.error("Failed to shutdown: {}", exx.getMessage());}}/*** * 获取一个新的channel channel为与TC之间网络通道* @param address the address 网络地址* @return the new channel*/public Channel getNewChannel(InetSocketAddress address) {Channel channel;//连接TCChannelFuture f = this.bootstrap.connect(address);try {//等待超时时间内与Server端进行 若无法连接抛出异常f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);if (f.isCancelled()) {throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");} else if (!f.isSuccess()) {throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");} else {channel = f.channel();}} catch (Exception e) {throw new FrameworkException(e, "can not connect to services-server.");}return channel;}/*** Gets thread prefix.** @param threadPrefix the thread prefix* @return the thread prefix*/private String getThreadPrefix(String threadPrefix) {return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();}
}

以上为TM大体的初始化过程,详细可自行研读源码


GlobalTransactionScanner中afterPropertiesSet解析完成之后,会执行AbstractAutoProxyCreator(该类用于为Bean生成代理对象,)中的wrapIfNecessary()方法,(AbstractAutoProxyCreator实际上实现了BeanPostProcessor接口,而wrapIfNecessary在postProcessAfterInitialization方法中被调用,因此它在afterPropertiesSet之后执行

wrapIfNecessary源码分析:

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//是否开启GlobalTransactionif (disableGlobalTransaction) {return bean;}try {//PROXYED_SET 记录已被代理synchronized (PROXYED_SET) {//该bean 是否已被代理 若已被无需重复代理if (PROXYED_SET.contains(beanName)) {return bean;}//MethodInterceptor 定义方法拦截器interceptor = null;//检测是否是TCC 模式下代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));} else {//非jdk代理 基于class方式Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);//如果bean是jdk代理(基于接口) 获取元ClassClass<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//是否包含Annotationif (!existsAnnotation(new Class[]{serviceInterface})&& !existsAnnotation(interfacesIfJdk)) {return bean;}if (interceptor == null) {if (globalTransactionalInterceptor == null) {//构建globalTransactionalInterceptorglobalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,(ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果gaibean不是aop代理类bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {// 执行包装目标对象到代理对象  AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));for (Advisor avr : advisor) {advised.addAdvisor(0, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}/*** 目标 classes的方法中是否包含GlobalTransactional 或GlobalLock 注解* @param classes* @return*/private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}Method[] methods = clazz.getMethods();for (Method method : methods) {//是否包含GlobalTransactional注解trxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}//GlobalLockGlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}

从上述代码中可看出,用GlobalTransactionalInterceptor 代替了GlobalTransactional 和 GlobalLock 注解的方法

1.3:GlobalTransactionalInterceptor(全局事务拦截器)

该类用于代理处理@GlobalTransactional被执行,如下源码所示:

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor {private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();//事务模版类private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();//失败处理器private final FailureHandler failureHandler;private volatile boolean disable;//服务自检周期	默认2000,单位ms.每2秒进行一次服务自检,来决定private static int degradeCheckPeriod;//降级检测开关 降级开关	默认false。业务侧根据连续错误数自动降级不走seata事务private static volatile boolean degradeCheck;//升降级达标阈值	默认10private static int degradeCheckAllowTimes;private static volatile Integer degradeNum = 0;private static volatile Integer reachNum = 0;private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);//用于周期检测是否降级 执行器 应该在degradeCheck =true时被初始化private static ScheduledThreadPoolExecutor executor =new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true));/*** Instantiates a new Global transactional interceptor.** @param failureHandler*            the failure handler*/public GlobalTransactionalInterceptor(FailureHandler failureHandler) {//初始化动作this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,DEFAULT_DISABLE_GLOBAL_TRANSACTION);degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,DEFAULT_TM_DEGRADE_CHECK);//开启降级设置if (degradeCheck) {ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);EVENT_BUS.register(this);if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {startDegradeCheck();}}}/*** 代理方法调用逻辑* @param methodInvocation 被代理的原方法* @return* @throws Throwable*/@Overridepublic Object invoke(final MethodInvocation methodInvocation) throws Throwable {//获取方法所属类Class<?> targetClass =methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;//获取执行具体的Method对象Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);//获取GlobalTransactional注解对象 获取定义属性final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);//获取GlobalLock对象final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//是否被降级或者开启全局事务boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);if (!localDisable) {//判定globalTransactional注解还是globalLock全局锁对象if (globalTransactionalAnnotation != null) {return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);} else if (globalLockAnnotation != null) {return handleGlobalLock(methodInvocation);}}}//执行具体方法return methodInvocation.proceed();}/**** @param methodInvocation* @return* @throws Exception*/private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {//return globalLockTemplate.execute(() -> {try {return methodInvocation.proceed();} catch (Exception e) {throw e;} catch (Throwable e) {throw new RuntimeException(e);}});}/*** 核心代理本质通过transactionalTemplate来实现* @param methodInvocation* @param globalTrxAnno* @return* @throws Throwable*/private Object handleGlobalTransaction(final MethodInvocation methodInvocation,final GlobalTransactional globalTrxAnno) throws Throwable {boolean succeed = true;try {return transactionalTemplate.execute(new TransactionalExecutor() {@Overridepublic Object execute() throws Throwable {//执行原方法return methodInvocation.proceed();}public String name() {String name = globalTrxAnno.name();if (!StringUtils.isNullOrEmpty(name)) {return name;}return formatMethod(methodInvocation.getMethod());}@Overridepublic TransactionInfo getTransactionInfo() {//根据注解中设定信息构建TransactionInfo transactionalTemplate中需要使用TransactionInfo transactionInfo = new TransactionInfo();transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());transactionInfo.setName(name());transactionInfo.setPropagation(globalTrxAnno.propagation());Set<RollbackRule> rollbackRules = new LinkedHashSet<>();for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {rollbackRules.add(new RollbackRule(rbRule));}for (String rbRule : globalTrxAnno.rollbackForClassName()) {rollbackRules.add(new RollbackRule(rbRule));}for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {rollbackRules.add(new NoRollbackRule(rbRule));}for (String rbRule : globalTrxAnno.noRollbackForClassName()) {rollbackRules.add(new NoRollbackRule(rbRule));}transactionInfo.setRollbackRules(rollbackRules);return transactionInfo;}});} catch (TransactionalExecutor.ExecutionException e) {TransactionalExecutor.Code code = e.getCode();switch (code) {//在事务哪阶段发生了异常 根据不同异常分支走不同代码case RollbackDone:throw e.getOriginalException();case BeginFailure://第一阶段发生异常succeed = false;failureHandler.onBeginFailure(e.getTransaction(), e.getCause());throw e.getCause();case CommitFailure:succeed = false;//commit发生异常failureHandler.onCommitFailure(e.getTransaction(), e.getCause());throw e.getCause();case RollbackFailure://回滚时发生异常failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();case RollbackRetrying://回滚重试异常failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());throw e.getOriginalException();default://其它异常throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));}} finally {if (degradeCheck) {EVENT_BUS.post(new DegradeCheckEvent(succeed));}}}public <T extends Annotation> T getAnnotation(Method method, Class<?> targetClass, Class<T> annotationClass) {return Optional.ofNullable(method).map(m -> m.getAnnotation(annotationClass)).orElse(Optional.ofNullable(targetClass).map(t -> t.getAnnotation(annotationClass)).orElse(null));}//构建该方法唯一名称 避免方法重载使用入参private String formatMethod(Method method) {StringBuilder sb = new StringBuilder(method.getName()).append("(");//方法执行参数类型Class<?>[] params = method.getParameterTypes();int in = 0;for (Class<?> clazz : params) {sb.append(clazz.getName());if (++in < params.length) {sb.append(", ");}}return sb.append(")").toString();}/*** 监听ConfigurationChangeEvent事件 只针对于disable_global_transaction与client_degrade_check变更* @param event the event*/@Overridepublic void onChangeEvent(ConfigurationChangeEvent event) {if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,disable, event.getNewValue());disable = Boolean.parseBoolean(event.getNewValue().trim());} else if (ConfigurationKeys.CLIENT_DEGRADE_CHECK.equals(event.getDataId())) {degradeCheck = Boolean.parseBoolean(event.getNewValue());if (!degradeCheck) {degradeNum = 0;}}}/*** auto upgrade service detection*/private static void startDegradeCheck() {executor.scheduleAtFixedRate(() -> {if (degradeCheck) {try {String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000);TransactionManagerHolder.get().commit(xid);EVENT_BUS.post(new DegradeCheckEvent(true));} catch (Exception e) {EVENT_BUS.post(new DegradeCheckEvent(false));}}}, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS);}@Subscribepublic static void onDegradeCheck(DegradeCheckEvent event) {if (event.isRequestSuccess()) {if (degradeNum >= degradeCheckAllowTimes) {reachNum++;if (reachNum >= degradeCheckAllowTimes) {reachNum = 0;degradeNum = 0;if (LOGGER.isInfoEnabled()) {LOGGER.info("the current global transaction has been restored");}}} else if (degradeNum != 0) {degradeNum = 0;}} else {if (degradeNum < degradeCheckAllowTimes) {degradeNum++;if (degradeNum >= degradeCheckAllowTimes) {if (LOGGER.isWarnEnabled()) {LOGGER.warn("the current global transaction has been automatically downgraded");}}} else if (reachNum != 0) {reachNum = 0;}}}
}

从上述源码中invoke方法可知,本质通过TransactionalTemplate的execute来执行真正的流程,如下所示:

1.4:TransactionalTemplate(事务执行模版)

该类封装了事务执行的

public class TransactionalTemplate {private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);/*** Execute object.** @param business the business* @return the object* @throws TransactionalExecutor.ExecutionException the execution exception*/public Object execute(TransactionalExecutor business) throws Throwable {// 1 获取GlobalTransactionalInterceptor中根据注解封装的TransactionInfo类TransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}// 1.1创建一个全局事务 默认为DefaultGlobalTransaction 感觉当前上下文中是否包含一个xidGlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();// 1.2 处理不同的事务传播级别和branchTypePropagation propagation = txInfo.getPropagation();SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {//处理不同的事务传播级别case NOT_SUPPORTED:suspendedResourcesHolder = tx.suspend(true);return business.execute();case REQUIRES_NEW:suspendedResourcesHolder = tx.suspend(true);break;case SUPPORTS:if (!existingTransaction()) {//如果已经存在事务直接执行 不创建事务return business.execute();}break;case REQUIRED:break;case NEVER:if (existingTransaction()) {//如果已经存在事务throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s",RootContext.getXID()));} else {return business.execute();}case MANDATORY:if (!existingTransaction()) {throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");}break;default:throw new TransactionException("Not Supported Propagation:" + propagation);}try {// 2. 开始 TransactionbeginTransaction(txInfo, tx);Object rs = null;try {// 执行业务代码rs = business.execute();} catch (Throwable ex) {//3 在业务代码执行若发生异常 判定抛出的异常是否需要被回滚completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}// 4.所有方法都以准备完成 commit事务commitTransaction(tx);//返回结果集return rs;} finally {//5. 清除triggerAfterCompletion();cleanUp();}} finally {tx.resume(suspendedResourcesHolder);}}public boolean existingTransaction() {return StringUtils.isNotEmpty(RootContext.getXID());}private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {//roll backif (txInfo != null && txInfo.rollbackOn(originalException)) {try {//需要进行回滚rollbackTransaction(tx, originalException);} catch (TransactionException txe) {//回滚失败 抛出RollbackFailure类型异常 由GlobalTransactionalInterceptor处理throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.RollbackFailure, originalException);}} else {//这个异常不需要进行 回滚 直接提交commitTransaction(tx);}}/*** 提交事务* @param tx* @throws TransactionalExecutor.ExecutionException*/private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//执行commit的之前钩子函数triggerBeforeCommit();tx.commit();//执行commit的after钩子函数triggerAfterCommit();} catch (TransactionException txe) {// 4.1 事务提交失败throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.CommitFailure);}}/*** 回滚一个事务* @param tx* @param originalException* @throws TransactionException* @throws TransactionalExecutor.ExecutionException*/private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {//执行rollback的之前钩子函数triggerBeforeRollback();tx.rollback();//执行rollback的之后钩子函数triggerAfterRollback();// 3.1 Successfully rolled backthrow new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);}/*** 开始一个事务* @param txInfo* @param tx* @throws TransactionalExecutor.ExecutionException*/private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {//执行beginTransaction的before钩子函数triggerBeforeBegin();//底层是通过GlobalTransaction来执行tx.begin(txInfo.getTimeOut(), txInfo.getName());//执行beginTransaction的After钩子函数triggerAfterBegin();} catch (TransactionException txe) {throw new TransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure);}}private void triggerBeforeBegin() {for (TransactionHook hook : getCurrentHooks()) {try {hook.beforeBegin();} catch (Exception e) {LOGGER.error("Failed execute beforeBegin in hook {}",e.getMessage(),e);}}}private void triggerAfterBegin() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterBegin();} catch (Exception e) {LOGGER.error("Failed execute afterBegin in hook {}",e.getMessage(),e);}}}private void triggerBeforeRollback() {for (TransactionHook hook : getCurrentHooks()) {try {hook.beforeRollback();} catch (Exception e) {LOGGER.error("Failed execute beforeRollback in hook {}",e.getMessage(),e);}}}private void triggerAfterRollback() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterRollback();} catch (Exception e) {LOGGER.error("Failed execute afterRollback in hook {}",e.getMessage(),e);}}}private void triggerBeforeCommit() {for (TransactionHook hook : getCurrentHooks()) {try {hook.beforeCommit();} catch (Exception e) {LOGGER.error("Failed execute beforeCommit in hook {}",e.getMessage(),e);}}}private void triggerAfterCommit() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterCommit();} catch (Exception e) {LOGGER.error("Failed execute afterCommit in hook {}",e.getMessage(),e);}}}private void triggerAfterCompletion() {for (TransactionHook hook : getCurrentHooks()) {try {hook.afterCompletion();} catch (Exception e) {LOGGER.error("Failed execute afterCompletion in hook {}",e.getMessage(),e);}}}private void cleanUp() {TransactionHookManager.clear();}private List<TransactionHook> getCurrentHooks() {return TransactionHookManager.getHooks();}}

从上述源码可以看出对于全局事务的提交回滚都是通过GlobalTransaction接口来实现的

1.4:GlobalTransaction

该接口提供了事务有关的所有方法,具体的实现为DefaultGlobalTransaction

public interface GlobalTransaction {/*** 使用默认超时和名称开始新的全局事务**/void begin() throws TransactionException;/*** 使用给定的超时和默认名称开始新的全局事务。**/void begin(int timeout) throws TransactionException;/***使用给定的超时和给定的名称开始新的全局事务。**/void begin(int timeout, String name) throws TransactionException;/*** 提交全局事务。**/void commit() throws TransactionException;/*** 回滚全局事务。**/void rollback() throws TransactionException;/*** 暂停全局事务。**/SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException;/*** 恢复全局事务。**/void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException;/***向TC询问相应全局事务的当前状态。**/GlobalStatus getStatus() throws TransactionException;/*** 获取 XID.** @return XID. xid*/String getXid();/**** 向tc报告全局事务状态**/void globalReport(GlobalStatus globalStatus) throws TransactionException;/*** 全局事务的本地状态。**/GlobalStatus getLocalStatus();
}

1.4:DefaultGlobalTransaction

public class DefaultGlobalTransaction implements GlobalTransaction {private static final Logger LOGGER = LoggerFactory.getLogger(DefaultGlobalTransaction.class);//默认全局事务执行的超时事件private static final int DEFAULT_GLOBAL_TX_TIMEOUT = 60000;private static final String DEFAULT_GLOBAL_TX_NAME = "default";private TransactionManager transactionManager;//全局xidprivate String xid;//全局状态private GlobalStatus status;//当前执行流程在全局事务中的角色 Launcher 或Participantprivate GlobalTransactionRole role;private static final int COMMIT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_TM_COMMIT_RETRY_COUNT, DEFAULT_TM_COMMIT_RETRY_COUNT);private static final int ROLLBACK_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_TM_ROLLBACK_RETRY_COUNT, DEFAULT_TM_ROLLBACK_RETRY_COUNT);/*** 实例化新的默认全局事务。*/DefaultGlobalTransaction() {this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);}/*** 实例化新的默认全局事务。** @param xid    the xid* @param status the status* @param role   the role*/DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {this.transactionManager = TransactionManagerHolder.get();this.xid = xid;this.status = status;this.role = role;}@Overridepublic void begin() throws TransactionException {begin(DEFAULT_GLOBAL_TX_TIMEOUT);}@Overridepublic void begin(int timeout) throws TransactionException {begin(timeout, DEFAULT_GLOBAL_TX_NAME);}@Overridepublic void begin(int timeout, String name) throws TransactionException {//验证角色为Launcher 提交者if (role != GlobalTransactionRole.Launcher) {//作为一个参与者 上下文中肯定包含xid 若不包含抛出异常assertXIDNotNull();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);}return;}//验证xid是否为null 不为 null说明当前事务状态不对assertXIDNull();if (RootContext.getXID() != null) {throw new IllegalStateException();}//获取一个新的xidxid = transactionManager.begin(null, null, name, timeout);//变更当前事务状态status = GlobalStatus.Begin;//将这个xid写入到全局事务上下文中RootContext.bind(xid);if (LOGGER.isInfoEnabled()) {LOGGER.info("Begin new global transaction [{}]", xid);}}@Overridepublic void commit() throws TransactionException {//如果是Participant参与者无法进行事务提交if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();//提交重试次数 默认5次int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {//一直重试知道达到最大次数或commit成功while (retry > 0) {try {//提交事务status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());//重试次数-1retry--;if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {//解绑xidsuspend(true);}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", xid, status);}}@Overridepublic void rollback() throws TransactionException {if (role == GlobalTransactionRole.Participant) {// Participant has no responsibility of rollbackif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);}return;}//验证xid是否有效assertXIDNotNull();//回滚重试次数 默认5次int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;try {while (retry > 0) {try {//获取回滚状态status = transactionManager.rollback(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());retry--;if (retry == 0) {throw new TransactionException("Failed to report global rollback", ex);}}}} finally {if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {//暂停任务 解绑xidsuspend(true);}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] rollback status: {}", xid, status);}}/*** 中断全局事务* @param unbindXid if true,suspend the global transaction.* @return* @throws TransactionException*/@Overridepublic SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException {String xid = RootContext.getXID();if (StringUtils.isNotEmpty(xid) && unbindXid) {RootContext.unbind();if (LOGGER.isDebugEnabled()) {LOGGER.debug("Suspending current transaction,xid = {}",xid);}} else {xid = null;}return new SuspendedResourcesHolder(xid);}/*** 继续全局事务* @param suspendedResourcesHolder the suspended resources to resume* @throws TransactionException*/@Overridepublic void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException {if (suspendedResourcesHolder == null) {return;}String xid = suspendedResourcesHolder.getXid();if (StringUtils.isNotEmpty(xid)) {RootContext.bind(xid);if (LOGGER.isDebugEnabled()) {LOGGER.debug("Resumimg the transaction,xid = {}", xid);}}}/*** 获取TC中携有的全局事务状态* @return* @throws TransactionException*/@Overridepublic GlobalStatus getStatus() throws TransactionException {if (xid == null) {return GlobalStatus.UnKnown;}status = transactionManager.getStatus(xid);return status;}/*** 获取全局xid* @return*/@Overridepublic String getXid() {return xid;}@Overridepublic void globalReport(GlobalStatus globalStatus) throws TransactionException {assertXIDNotNull();if (globalStatus == null) {throw new IllegalStateException();}//远程向tc报告当前事务状态status = transactionManager.globalReport(xid, globalStatus);if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] report status: {}", xid, status);}if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {suspend(true);}}/*** 获取本地持有的全局事务状态* @return*/@Overridepublic GlobalStatus getLocalStatus() {return status;}private void assertXIDNotNull() {if (xid == null) {throw new IllegalStateException();}}private void assertXIDNull() {if (xid != null) {throw new IllegalStateException();}}}

1.4:DefaultTransactionManager

用于管理TM与TC之间交互方法

public class DefaultTransactionManager implements TransactionManager {@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout)throws TransactionException {//构建请求参数GlobalBeginRequest request = new GlobalBeginRequest();request.setTransactionName(name);request.setTimeout(timeout);//同步发送begin netty请求GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);//判断返回状态if (response.getResultCode() == ResultCode.Failed) {throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());}//获取全局xidreturn response.getXid();}@Overridepublic GlobalStatus commit(String xid) throws TransactionException {//构建请求参数GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);//同步发送commit netty请求GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);//返回当前全局事务状态return response.getGlobalStatus();}@Overridepublic GlobalStatus rollback(String xid) throws TransactionException {//构建请求参数GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();globalRollback.setXid(xid);//同步发送rollback netty请求GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);//返回当前全局事务状态return response.getGlobalStatus();}@Overridepublic GlobalStatus getStatus(String xid) throws TransactionException {GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();queryGlobalStatus.setXid(xid);GlobalStatusResponse response = (GlobalStatusResponse) syncCall(queryGlobalStatus);return response.getGlobalStatus();}@Overridepublic GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException {GlobalReportRequest globalReport = new GlobalReportRequest();globalReport.setXid(xid);globalReport.setGlobalStatus(globalStatus);//上报当前分支事务状态GlobalReportResponse response = (GlobalReportResponse) syncCall(globalReport);return response.getGlobalStatus();}private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}}
}

 

从上述代码可以看出通过sendSyncRequest()来发送netty请求,sendSyncRequest为TmNettyRemotingClient父类AbstractNettyRemotingClient方法。

1.5:AbstractNettyRemotingClient

public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemotingClient.class);private static final String MSG_ID_PREFIX = "msgId:";private static final String FUTURES_PREFIX = "futures:";private static final String SINGLE_LOG_POSTFIX = ";";private static final int MAX_MERGE_SEND_MILLS = 1;private static final String THREAD_PREFIX_SPLIT_CHAR = "_";private static final int MAX_MERGE_SEND_THREAD = 1;private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";protected final Object mergeLock = new Object();/*** When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.*/protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();/*** When batch sending is enabled, the message will be stored to basketMap* Send via asynchronous thread {@link MergedSendRunnable}* {@link NettyClientConfig#isEnableClientBatchSendRequest}*/protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();private final NettyClientBootstrap clientBootstrap;private NettyClientChannelManager clientChannelManager;private final NettyPoolKey.TransactionRole transactionRole;private ExecutorService mergeSendExecutorService;private TransactionMessageHandler transactionMessageHandler;@Overridepublic void init() {//定义周期延时任务默认10s 该任务用于TM与TC的channel的连接检测 对于断的Channel进行重连timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否配置开启transport.enableClientBatchSendRequest即客户端事务消息请求是否批量合并发送 默认为true//批量发送的原理使用 等待(汇集数据)唤醒(发送数据)机制 + CompletableFuture获取异步发送回调方法if (NettyClientConfig.isEnableClientBatchSendRequest()) {//定义线程pool 默认1个线程mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));//运行MergedSendRunnable任务即合并发送请求mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//netty client 启动clientBootstrap.start();}public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {super(messageExecutor);this.transactionRole = transactionRole;clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);//设置netty handler 接受netty 结果返回结果集并写入到 MessageFuture的rs中clientBootstrap.setChannelHandlers(new ClientHandler());clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);}@Overridepublic Object sendSyncRequest(Object msg) throws TimeoutException {//一般TC肯定为集群部署 通过负载均衡算法获取对应服务器地址String serverAddress = loadBalance(getTransactionServiceGroup());//获取请求超时时间 默认30sint timeoutMillis = NettyClientConfig.getRpcRequestTimeout();//构建请求信息RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);// send batch message// put message into basketMap, @see MergedSendRunnable//是否开启批量发送 默认开启 如果开启将信息缓存到basketMap 然后再统一发送// 在等待批量发送阶段时 这些线程对应任务都在阻塞等待 知道发送成功后才被唤醒if (NettyClientConfig.isEnableClientBatchSendRequest()) {// send batch message is sync request, needs to create messageFuture and put it in futures.//发送批处理消息是同步请求,需要创建messageFuture(本质通过CompletableFuture实现)并将其放入futures缓存中。MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);// 数据写入到basketMap 缓存中ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;//获取请求TC地址对应的任务队列BlockingQueue<RpcMessage> basket = map.get(serverAddress);//为每一个TC地址创建一个发送任务队列if (basket == null) {//数据写入map.putIfAbsent(serverAddress, new LinkedBlockingQueue<>());basket = map.get(serverAddress);}//写入队列basket.offer(rpcMessage);if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());}//判断是否是否可以发送if (!isSending) {//唤醒mergeLock 锁下阻塞等待数据的MergedSendRunnable(一次会将堆积的数据全部请求发送掉 堆积的数据大小为上次MergedSendRunnable发送时间)synchronized (mergeLock) {mergeLock.notifyAll();}}try {//在超时时间内阻塞等待结果return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}",exx.getMessage(), serverAddress, rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}} else {//获取连接通道Netty ChannelChannel channel = clientChannelManager.acquireChannel(serverAddress);//直接将数据发送return super.sendSync(channel, rpcMessage, timeoutMillis);}}@Overridepublic Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {if (channel == null) {LOGGER.warn("sendSyncRequest nothing, caused by null channel.");return null;}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyClientConfig.getRpcRequestTimeout());}@Overridepublic void sendAsyncRequest(Channel channel, Object msg) {if (channel == null) {LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");return;}//构建请求 RpcMessageRpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);if (rpcMessage.getBody() instanceof MergeMessage) {//记录merge后消息缓存体mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());}super.sendAsync(channel, rpcMessage);}@Overridepublic void sendAsyncResponse(String serverAddress, RpcMessage rpcMessage, Object msg) {RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, ProtocolConstants.MSGTYPE_RESPONSE);Channel channel = clientChannelManager.acquireChannel(serverAddress);super.sendAsync(channel, rpcMsg);}/*** 为每个messagetype 注册processor数据解析执行其器* @param requestCode* @param processor   {@link RemotingProcessor}* @param executor    thread pool*/@Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}@Overridepublic void destroyChannel(String serverAddress, Channel channel) {clientChannelManager.destroyChannel(serverAddress, channel);}@Overridepublic void destroy() {clientBootstrap.shutdown();if (mergeSendExecutorService != null) {mergeSendExecutorService.shutdown();}super.destroy();}public void setTransactionMessageHandler(TransactionMessageHandler transactionMessageHandler) {this.transactionMessageHandler = transactionMessageHandler;}public TransactionMessageHandler getTransactionMessageHandler() {return transactionMessageHandler;}public NettyClientChannelManager getClientChannelManager() {return clientChannelManager;}/*** 根据注册中心获取tx对应实际地址  并根据负载均衡算法从集群地址中选取任意一个地址* @param transactionServiceGroup tx-service-group 事务分组* @return*/private String loadBalance(String transactionServiceGroup) {InetSocketAddress address = null;try {@SuppressWarnings("unchecked")//根据使用不同的注册中心获取tx对应实际地址()List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);//负载均衡算法(支持两种线性roundRobin与随机)从集群地址中选取任意一个地址address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);} catch (Exception ex) {LOGGER.error(ex.getMessage());}if (address == null) {throw new FrameworkException(NoAvailableService);}//转化为标准http://ip+port地址return NetUtil.toStringAddress(address);}private String getThreadPrefix() {return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();}/*** Get pool key function.** @return lambda function*/protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();/*** Get transaction service group.** @return transaction service group*/protected abstract String getTransactionServiceGroup();/*** 合并数据发送任务*/private class MergedSendRunnable implements Runnable {@Overridepublic void run() {//死循环while (true) {//synchronized (mergeLock) {try {//阻塞等待1smergeLock.wait(MAX_MERGE_SEND_MILLS);} catch (InterruptedException e) {}}isSending = true;//循环发送缓存basketMap中存储数据for (String address : basketMap.keySet()) {BlockingQueue<RpcMessage> basket = basketMap.get(address);if (basket.isEmpty()) {continue;}//message 包装类 封装批量的RpcMessageMergedWarpMessage mergeMessage = new MergedWarpMessage();while (!basket.isEmpty()) {RpcMessage msg = basket.poll();mergeMessage.msgs.add((AbstractMessage) msg.getBody());mergeMessage.msgIds.add(msg.getId());}if (mergeMessage.msgIds.size() > 1) {//打印批量日志printMergeMessageLog(mergeMessage);}Channel sendChannel = null;try {// send batch message is sync request, but there is no need to get the return value.// Since the messageFuture has been created before the message is placed in basketMap,// the return value will be obtained in ClientOnResponseProcessor.//获取send channelsendChannel = clientChannelManager.acquireChannel(address);//发送批量同步请求 这里不需要获取返回值 而是通过messageFuture中的ClientOnResponseProcessor来返回AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);} catch (FrameworkException e) {//发送失败处理if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);}// fast fail//发送merge消息失败 将之前futures缓存的messageId与future关系异常并将future回调结果置为nullfor (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);if (messageFuture != null) {messageFuture.setResultMessage(null);}}LOGGER.error("client merge call failed: {}", e.getMessage(), e);}}isSending = false;}}private void printMergeMessageLog(MergedWarpMessage mergeMessage) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());for (AbstractMessage cm : mergeMessage.msgs) {LOGGER.debug(cm.toString());}StringBuilder sb = new StringBuilder();for (long l : mergeMessage.msgIds) {sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);}sb.append("\n");for (long l : futures.keySet()) {sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);}LOGGER.debug(sb.toString());}}}/*** The type ClientHandler.*/@Sharableclass ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//处理TC返回的message数据processMessage(ctx, (RpcMessage) msg);}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) {synchronized (lock) {if (ctx.channel().isWritable()) {lock.notifyAll();}}ctx.fireChannelWritabilityChanged();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (messageExecutor.isShutdown()) {return;}if (LOGGER.isInfoEnabled()) {LOGGER.info("channel inactive: {}", ctx.channel());}clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress(ctx.channel().remoteAddress()));super.channelInactive(ctx);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.READER_IDLE) {if (LOGGER.isInfoEnabled()) {LOGGER.info("channel {} read idle.", ctx.channel());}try {String serverAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());clientChannelManager.invalidateObject(serverAddress, ctx.channel());} catch (Exception exx) {LOGGER.error(exx.getMessage());} finally {clientChannelManager.releaseChannel(ctx.channel(), getAddressFromContext(ctx));}}if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {try {if (LOGGER.isDebugEnabled()) {LOGGER.debug("will send ping msg,channel {}", ctx.channel());}AbstractNettyRemotingClient.this.sendAsyncRequest(ctx.channel(), HeartbeatMessage.PING);} catch (Throwable throwable) {LOGGER.error("send request error: {}", throwable.getMessage(), throwable);}}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(),NetUtil.toStringAddress(ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage(), cause);clientChannelManager.releaseChannel(ctx.channel(), getAddressFromChannel(ctx.channel()));if (LOGGER.isInfoEnabled()) {LOGGER.info("remove exception rm channel:{}", ctx.channel());}super.exceptionCaught(ctx, cause);}@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {if (LOGGER.isInfoEnabled()) {LOGGER.info(ctx + " will closed");}super.close(ctx, future);}}
}
//单条记录发送
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);//写入缓存futuresfutures.put(rpcMessage.getId(), messageFuture);channelWritableCheck(channel, rpcMessage.getBody());//message数据写入netty channel 异步发送 注册回调listenerchannel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {//异步回调失败处理if (!future.isSuccess()) {//从futures结果集中移除MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {//记录失败原因messageFuture1.setResultMessage(future.cause());}//销毁关闭 channeldestroyChannel(future.channel());}});try {//等待获取回调传回的return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(),rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}

从上文源码可以看到根据EnableClientBatchSendRequest配置,由client决定是否将数据合并发送,如果该设置会加大seata TM的整体吞吐量,但会损失响应时长。关于这个配置可感觉实际业务场景设定,如果是大量的分布式事务场景,可以设置为true(默认也为true),若是少量可以设置为false,加快响应时间。不论单条还是merge消息聚合发送,本质是通过netty异步发送message,注册对于的listener回调监听,注册ChannelHandlers(ClientHandler继承ChannelInboundHandlerAdapter,在client接受response时执行),ClientHandler中获取response的body,通过body消息类型获取初始化绑定消息解析器(默认为ClientOnResponseProcessor,对于解析器的绑定动作在TmNettyRemotingClient或RmNettyRemotingClient的初始化init中执行)。而在ClientOnResponseProcessor中处理消息后将数据写入到MessageFuture的setResult中,这个MessageFuture中包含CompletableFuture类(对于MessageFuture提供的get或者setResult本质都是对CompletableFuture进行操作,MessageFuture算是对CompletableFuture的包装类吧)。在发送线程发送message后,线程通过MessageFuture提供的get的进行阻塞等待异步回调结果,只有当ClientOnResponseProcessor中对于的message有消息到达即setResult被执行,发送线程才能获取最终值返回值。对于此处的方式就是一个netty异步发送的具体实现。对于此处功能详细的描述,可自行参考源码。 

class NettyClientChannelManager {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientChannelManager.class);//每一个channel对应的lock 保证每一个TC server对只会有一个channel 被创建private final ConcurrentMap<String, Object> channelLocks = new ConcurrentHashMap<>();//TC-server地址(ip+port)为key  与NettyPoolKey(reg信息)关系private final ConcurrentMap<String, NettyPoolKey> poolKeyMap = new ConcurrentHashMap<>();//缓存所有channel TC-server地址(ip+port)为key channel为value RM 或TM client与TC集群的任意节点只会缓存一个channelprivate final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();//管理neety clinet key(reg信息)与channel 关系private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;//Tm 或RM中getPoolKeyFunction函数(reg信息) 映射 serverAddress 与NettyPoolKey关系private Function<String, NettyPoolKey> poolKeyFunction;NettyClientChannelManager(final NettyPoolableFactory keyPoolableFactory, final Function<String, NettyPoolKey> poolKeyFunction,final NettyClientConfig clientConfig) {//初始化 chnnel连接缓存pool 使用apache中连接poolnettyClientKeyPool = new GenericKeyedObjectPool<>(keyPoolableFactory);nettyClientKeyPool.setConfig(getNettyPoolConfig(clientConfig));this.poolKeyFunction = poolKeyFunction;}/*** 本地配置转化为GenericKeyedObjectPool 中连接pool设置* @param clientConfig* @return*/private GenericKeyedObjectPool.Config getNettyPoolConfig(final NettyClientConfig clientConfig) {//设置chnnel poolGenericKeyedObjectPool.Config poolConfig = new GenericKeyedObjectPool.Config();//最大存活数poolConfig.maxActive = clientConfig.getMaxPoolActive();//最小空闲数poolConfig.minIdle = clientConfig.getMinPoolIdle();//最大等待连接时间poolConfig.maxWait = clientConfig.getMaxAcquireConnMills();//测试poolConfig.testOnBorrow = clientConfig.isPoolTestBorrow();poolConfig.testOnReturn = clientConfig.isPoolTestReturn();poolConfig.lifo = clientConfig.isPoolLifo();return poolConfig;}/*** 获取在当前Rpc客户端上注册的所有通道** @return channels*/ConcurrentMap<String, Channel> getChannels() {return channels;}/*** 获与TC的netty客户端channel** @param serverAddress server address* @return netty channel*/Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);//已经存在if (channelToServer != null) {//获取alive的channelchannelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to " + serverAddress);}/*不存在channel 需要创建一个新的*///创建一个chanel对应lockchannelLocks.putIfAbsent(serverAddress, new Object());//加锁保证只会创建一个channelsynchronized (channelLocks.get(serverAddress)) {//创建一个channelreturn doConnect(serverAddress);}}/*** Release channel to pool if necessary.* 从pool中释放channel* @param channel channel* @param serverAddress server address*/void releaseChannel(Channel channel, String serverAddress) {if (channel == null || serverAddress == null) { return; }try {//加锁synchronized (channelLocks.get(serverAddress)) {Channel ch = channels.get(serverAddress);if (ch == null) {nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);return;}if (ch.compareTo(channel) == 0) {if (LOGGER.isInfoEnabled()) {LOGGER.info("return to pool, rm channel:{}", channel);}destroyChannel(serverAddress, channel);} else {nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);}}} catch (Exception exx) {LOGGER.error(exx.getMessage());}}/*** 销毁 channel.** @param serverAddress server address* @param channel channel*/void destroyChannel(String serverAddress, Channel channel) {if (channel == null) { return; }try {//从缓存中移除if (channel.equals(channels.get(serverAddress))) {channels.remove(serverAddress);}nettyClientKeyPool.returnObject(poolKeyMap.get(serverAddress), channel);} catch (Exception exx) {LOGGER.error("return channel to rmPool error:{}", exx.getMessage());}}/*** Reconnect to remote server of current transaction service group.* 从新连接远程远程服务* @param transactionServiceGroup transaction service group*/void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//从注册中心获取有效服务地址availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}if (CollectionUtils.isEmpty(availList)) {//error 日志打印String serviceGroup = RegistryFactory.getInstance().getServiceGroup(transactionServiceGroup);LOGGER.error("no available service '{}' found, please make sure registry config correct", serviceGroup);return;}for (String serverAddress : availList) {try {//重新构建channelacquireChannel(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}}/*** 从pool中作废channel* @param serverAddress tc地址* @param channel* @throws Exception*/void invalidateObject(final String serverAddress, final Channel channel) throws Exception {nettyClientKeyPool.invalidateObject(poolKeyMap.get(serverAddress), channel);}/*** 注册一个channel到本地缓存* @param serverAddress tc地址* @param channel*/void registerChannel(final String serverAddress, final Channel channel) {//判断channel有效写入缓存if (channels.get(serverAddress) != null && channels.get(serverAddress).isActive()) {return;}channels.put(serverAddress, channel);}/*** 创建一个新的channel* @param serverAddress* @return*/private Channel doConnect(String serverAddress) {//再次校验Channel channelToServer = channels.get(serverAddress);if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {//获取reg NettyPoolKeyNettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);//写入缓存NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);//校验是否为RM reg request请求 若是需要写入resourceIdsif (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}//写入pool中channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;}private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {//根据服务名称从注册中心中获取有效的服务信息列表List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);if (CollectionUtils.isEmpty(availInetSocketAddressList)) {return Collections.emptyList();}//转化数据格式return availInetSocketAddressList.stream().map(NetUtil::toStringAddress).collect(Collectors.toList());}private Channel getExistAliveChannel(Channel rmChannel, String serverAddress) {if (rmChannel.isActive()) {return rmChannel;} else {int i = 0;//重新校验channel 是否alive(默认300次-共3s)for (; i < NettyClientConfig.getMaxCheckAliveRetry(); i++) {try {//等待10msThread.sleep(NettyClientConfig.getCheckAliveInternal());} catch (InterruptedException exx) {LOGGER.error(exx.getMessage());}//重新校验rmChannel = channels.get(serverAddress);if (rmChannel != null && rmChannel.isActive()) {return rmChannel;}}//警告 移除无效channelif (i == NettyClientConfig.getMaxCheckAliveRetry()) {LOGGER.warn("channel {} is not active after long wait, close it.", rmChannel);releaseChannel(rmChannel, serverAddress);return null;}}return null;}
}

管理与TC之间的channel的NettyClientChannelManager类

NettyPoolableFactory
public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyPoolableFactory.class);private final AbstractNettyRemotingClient rpcRemotingClient;private final NettyClientBootstrap clientBootstrap;/*** Instantiates a new Netty key poolable factory.** @param rpcRemotingClient the rpc remoting client*/public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {this.rpcRemotingClient = rpcRemotingClient;this.clientBootstrap = clientBootstrap;}//构建一个新的channel@Overridepublic Channel makeObject(NettyPoolKey key) {InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());if (LOGGER.isInfoEnabled()) {LOGGER.info("NettyPool create channel to " + key);}Channel tmpChannel = clientBootstrap.getNewChannel(address);long start = System.currentTimeMillis();Object response;Channel channelToServer = null;if (key.getMessage() == null) {throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());}try {response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());if (!isRegisterSuccess(response, key.getTransactionRole())) {rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());} else {channelToServer = tmpChannel;rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());}} catch (Exception exx) {if (tmpChannel != null) {tmpChannel.close();}throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:"+ channelToServer);}return channelToServer;}//判断是否channel 是否reg成功private boolean isRegisterSuccess(Object response, NettyPoolKey.TransactionRole transactionRole) {if (response == null) {return false;}if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {if (!(response instanceof RegisterTMResponse)) {return false;}RegisterTMResponse registerTMResponse = (RegisterTMResponse)response;return registerTMResponse.isIdentified();} else if (transactionRole.equals(NettyPoolKey.TransactionRole.RMROLE)) {if (!(response instanceof RegisterRMResponse)) {return false;}RegisterRMResponse registerRMResponse = (RegisterRMResponse)response;return registerRMResponse.isIdentified();}return false;}private String getVersion(Object response, NettyPoolKey.TransactionRole transactionRole) {if (transactionRole.equals(NettyPoolKey.TransactionRole.TMROLE)) {return ((RegisterTMResponse) response).getVersion();} else {return ((RegisterRMResponse) response).getVersion();}}@Overridepublic void destroyObject(NettyPoolKey key, Channel channel) throws Exception {if (channel != null) {if (LOGGER.isInfoEnabled()) {LOGGER.info("will destroy channel:" + channel);}channel.disconnect();channel.close();}}@Overridepublic boolean validateObject(NettyPoolKey key, Channel obj) {if (obj != null && obj.isActive()) {return true;}if (LOGGER.isInfoEnabled()) {LOGGER.info("channel valid false,channel:" + obj);}return false;}@Overridepublic void activateObject(NettyPoolKey key, Channel obj) throws Exception {}@Overridepublic void passivateObject(NettyPoolKey key, Channel obj) throws Exception {}
}

上述描述对channel管理,包含创建,重连,移除等动作

2.0:异常处理

在GlobalTransactionalInterceptor中描述handleGlobalTransaction()的正常流程,当transactionalTemplate.execute()发生了异常情况,根据同步的异常类型,seata有着不同的处理方式。处理异常类由FailureHandler接口体现,如下所示:

public interface FailureHandler {/*** On begin failure.**/void onBeginFailure(GlobalTransaction tx, Throwable cause);/*** On commit failure.**/void onCommitFailure(GlobalTransaction tx, Throwable cause);/*** On rollback failure.**/void onRollbackFailure(GlobalTransaction tx, Throwable originalException);/*** On rollback retrying**/void onRollbackRetrying(GlobalTransaction tx, Throwable originalException);
}

从上述源码可以看出,FailureHandler中封装了TM与TC交互中基本所有异常的异常处理流程,它的默认实现DefaultFailureHandlerImpl,在GlobalTransactionalInterceptor初始化时被指定,如下所示:

public class DefaultFailureHandlerImpl implements FailureHandler {private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);/*** 重试最大时间默认1个小时 每次重试间隔时间10s 360次 共一个小时*/private static final int RETRY_MAX_TIMES = 6 * 60;//计划间隔秒数 默认10sprivate static final long SCHEDULE_INTERVAL_SECONDS = 10;private static final long TICK_DURATION = 1;private static final int TICKS_PER_WHEEL = 8;//timer 定时时间轮用于定时检测TC中对于事务状态 private HashedWheelTimer timer = new HashedWheelTimer(new NamedThreadFactory("failedTransactionRetry", 1),TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);@Overridepublic void onBeginFailure(GlobalTransaction tx, Throwable cause) {//在begin阶段 无任何业务逻辑执行 无需重试LOGGER.warn("Failed to begin transaction. ", cause);}@Overridepublic void onCommitFailure(GlobalTransaction tx, Throwable cause) {//在全局提交阶段 该阶段发生任何异常 不断测试TC中全局事务状态LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);}@Overridepublic void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {//在全局回滚阶段 该阶段发生任何异常 不断测试TC中全局事务状态LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);//定时器每隔10s进行Rollbacked状态检测timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);}@Overridepublic void onRollbackRetrying(GlobalTransaction tx, Throwable originalException) {StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});timer.newTimeout(new CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,TimeUnit.SECONDS);}/*** 异常重试*/protected class CheckTimerTask implements TimerTask {private final GlobalTransaction tx;//确认状态private final GlobalStatus required;//记录重试次数private int count = 0;//重试标识 直到重试成功private boolean isStopped = false;protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {this.tx = tx;this.required = required;}@Overridepublic void run(Timeout timeout) throws Exception {if (!isStopped) {//if (++count > RETRY_MAX_TIMES) {//超过次数重新再来LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);return;}//通过查询当前事务在TC中状态isStopped = shouldStop(tx, required);//不断重试timer.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);}}}private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {try {//获取tc中该事务当前状态GlobalStatus status = tx.getStatus();LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);//当前全局事务状态为确认或终态时才能结束if (status == required || status == GlobalStatus.Finished) {return true;}} catch (TransactionException e) {LOGGER.error("fetch GlobalTransaction status error", e);}return false;}}

2:seata客户端-RM(基于springcloud项目分析)

上述描述了TM大大致使用流程,在GlobalTransactionScanner初始化时一起被初始化。这里感觉有些服务可能不需要TC而只作为一个分支RM使用,所以这里个人感觉没必要两个都进行初始化,可以根据使用者的选择进行。

2.1:客户端RM client

rm与TM类似,在1.2与1.3版本中对于管理channel类有着不同的命名

public class RMClient {/*** 初始化** @param applicationId           the application id* @param transactionServiceGroup the transaction service group*/public static void init(String applicationId, String transactionServiceGroup) {//单列 获取RmNettyRemotingClientRmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);//设置DefaultResourceManager 单例模式 该类使用策略模式 携有BranchType类型对应ResouceManagerrmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());//设置DefaultRMHandler 单例模式 该类使用策略模式 携有BranchType类型对应RMHandler// 针对不同类型client的分布式事务实现具体使用不同策略rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());//初始化rmNettyRemotingClient.init();}}

RmNettyRemotingClient与TmNettyRemotingClient都继承AbstractNettyRemotingClient了,所以RmNettyRemotingClient的初始化过程与TmNettyRemotingClient基本差不多,所以这里也不再叙述。

2.1:DefaultRMHandler

携有BranchType类型对应RMHandler 针对不同类型client的分布式事务实现具体使用不同策略

public class DefaultRMHandler extends AbstractRMHandler {//记录BranchType(AT XA TCC SAGA) 与其对应4种RMHandlerprotected static Map<BranchType, AbstractRMHandler> allRMHandlersMap= new ConcurrentHashMap<BranchType, AbstractRMHandler>();protected DefaultRMHandler() {initRMHandlers();}protected void initRMHandlers() {List<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);if (CollectionUtils.isNotEmpty(allRMHandlers)) {for (AbstractRMHandler rmHandler : allRMHandlers) {allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);}}}//针对 commit rollbakc undologdel 3种流程的handler方法@Overridepublic BranchCommitResponse handle(BranchCommitRequest request) {return getRMHandler(request.getBranchType()).handle(request);}@Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) {return getRMHandler(request.getBranchType()).handle(request);}@Overridepublic void handle(UndoLogDeleteRequest request) {getRMHandler(request.getBranchType()).handle(request);}protected AbstractRMHandler getRMHandler(BranchType branchType) {return allRMHandlersMap.get(branchType);}@Overrideprotected ResourceManager getResourceManager() {throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");}private static class SingletonHolder {private static AbstractRMHandler INSTANCE = new DefaultRMHandler();}/***单例获取DefaultRMHandler** @return the resource manager*/public static AbstractRMHandler get() {return DefaultRMHandler.SingletonHolder.INSTANCE;}@Overridepublic BranchType getBranchType() {throw new FrameworkException("DefaultRMHandler isn't a real AbstractRMHandler");}
}

父抽象类 AbstractRMHandler封装具体执行流程,并调用底层ResourceManage执行RM数据层逻辑

public abstract class AbstractRMHandler extends AbstractExceptionHandlerimplements RMInboundHandler, TransactionMessageHandler {private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRMHandler.class);//模版方法模式@Overridepublic BranchCommitResponse handle(BranchCommitRequest request) {BranchCommitResponse response = new BranchCommitResponse();//执行异常处理统一模版exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;}@Overridepublic BranchRollbackResponse handle(BranchRollbackRequest request) {BranchRollbackResponse response = new BranchRollbackResponse();exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {@Overridepublic void execute(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {doBranchRollback(request, response);}}, request, response);return response;}/*** delete undo log 针对于AT下模式* @param request the request*/@Overridepublic void handle(UndoLogDeleteRequest request) {// https://github.com/seata/seata/issues/2226}/*** Do branch commit.** @param request  the request* @param response the response* @throws TransactionException the transaction exception*/protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);}//根据不同ResourceManager 提交底层流程//获取该分支事务执行状态 后续上报TCBranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch commit result: " + status);}}/*** Do branch rollback.** @param request  the request* @param response the response* @throws TransactionException the transaction exception*/protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}//获取该分支事务执行状态 后续上报TCBranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}}/*** get resource manager implement * 对应4种分布式事务branch 模式 AT XA TCC SAGA ** @return*/protected abstract ResourceManager getResourceManager();@Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToRM)) {throw new IllegalArgumentException();}//AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;transactionRequest.setRMInboundMessageHandler(this);return transactionRequest.handle(context);}@Overridepublic void onResponse(AbstractResultMessage response, RpcContext context) {LOGGER.info("the rm client received response msg [{}] from tc server.", response.toString());}public abstract BranchType getBranchType();
}

 

2.2:DefaultResourceManager(ResourceManager)

该类的设计策略与DefaultRMHandler一样,也是使用策略设计模式,内部包含了4种(AT,XA,TCC,SAGA)分布式事务分支对应的具体ResourceManager,由handler中调用,具体执行数据层的分支事务执行。

public class DefaultResourceManager implements ResourceManager {/*** all resource managers*/protected static Map<BranchType, ResourceManager> resourceManagers= new ConcurrentHashMap<>();private DefaultResourceManager() {initResourceManagers();}/*** Get resource manager.** @return the resource manager*/public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}/*** only for mock** @param branchType* @param rm*/public static void mockResourceManager(BranchType branchType, ResourceManager rm) {resourceManagers.put(branchType, rm);}protected void initResourceManagers() {//初始化所有ResourceManager 并写入缓存中List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm);}}}//分支 resource commit 本质就是删除undo日志@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId,String resourceId, String applicationData)throws TransactionException {return getResourceManager(branchType).branchCommit(branchType, xid, branchId, resourceId, applicationData);}@Overridepublic BranchStatus branchRollback(BranchType branchType, String xid, long branchId,String resourceId, String applicationData)throws TransactionException {return getResourceManager(branchType).branchRollback(branchType, xid, branchId, resourceId, applicationData);}@Overridepublic Long branchRegister(BranchType branchType, String resourceId,String clientId, String xid, String applicationData, String lockKeys)throws TransactionException {return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData,lockKeys);}@Overridepublic void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,String applicationData) throws TransactionException {getResourceManager(branchType).branchReport(branchType, xid, branchId, status, applicationData);}@Overridepublic boolean lockQuery(BranchType branchType, String resourceId,String xid, String lockKeys) throws TransactionException {return getResourceManager(branchType).lockQuery(branchType, resourceId, xid, lockKeys);}@Overridepublic void registerResource(Resource resource) {getResourceManager(resource.getBranchType()).registerResource(resource);}@Overridepublic void unregisterResource(Resource resource) {getResourceManager(resource.getBranchType()).unregisterResource(resource);}@Overridepublic Map<String, Resource> getManagedResources() {Map<String, Resource> allResource = new HashMap<>();for (ResourceManager rm : resourceManagers.values()) {Map<String, Resource> tempResources = rm.getManagedResources();if (tempResources != null) {allResource.putAll(tempResources);}}return allResource;}/*** get ResourceManager by Resource Type** @param branchType* @return*/public ResourceManager getResourceManager(BranchType branchType) {ResourceManager rm = resourceManagers.get(branchType);if (rm == null) {throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());}return rm;}@Overridepublic BranchType getBranchType() {throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");}private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}}

这里以AT模式为主 介绍其DatasourceManager(它底层通过AsyncWorker进行异步事务执行)

public class DataSourceManager extends AbstractResourceManager implements Initialize {private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceManager.class);private ResourceManagerInbound asyncWorker;private Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();/*** Sets async worker.** @param asyncWorker the async worker*/public void setAsyncWorker(ResourceManagerInbound asyncWorker) {this.asyncWorker = asyncWorker;}/*** 锁查询 在拥有GlobalLock判断当前是否存在全局锁*/@Overridepublic boolean lockQuery(BranchType branchType, String resourceId, String xid, String lockKeys)throws TransactionException {try {//封装请求GlobalLockQueryRequest request = new GlobalLockQueryRequest();request.setXid(xid);request.setLockKey(lockKeys);request.setResourceId(resourceId);GlobalLockQueryResponse response = null;if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {//请求TC 或者结果response = (GlobalLockQueryResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);} else {throw new RuntimeException("unknow situation!");}if (response.getResultCode() == ResultCode.Failed) {throw new TransactionException(response.getTransactionExceptionCode(),"Response[" + response.getMsg() + "]");}//是否被锁定return response.isLockable();} catch (TimeoutException toe) {throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);} catch (RuntimeException rex) {throw new RmTransactionException(TransactionExceptionCode.LockableCheckFailed, "Runtime", rex);}}@Deprecated@SuppressWarnings("unchecked")private String loadBalance() {InetSocketAddress address = null;try {List<InetSocketAddress> inetSocketAddressList = RegistryFactory.getInstance().lookup(TmNettyRemotingClient.getInstance().getTransactionServiceGroup());address = LoadBalanceFactory.getInstance().select(inetSocketAddressList);} catch (Exception ignore) {LOGGER.error(ignore.getMessage());}if (address == null) {throw new FrameworkException(NoAvailableService);}return NetUtil.toStringAddress(address);}/*** Init.** @param asyncWorker the async worker*/public synchronized void initAsyncWorker(ResourceManagerInbound asyncWorker) {setAsyncWorker(asyncWorker);}/*** Instantiates a new Data source manager.*/public DataSourceManager() {}@Overridepublic void init() {//创建AsyncWorker并初始化AsyncWorker asyncWorker = new AsyncWorker();asyncWorker.init();initAsyncWorker(asyncWorker);}//注册Resource代理对象 缓存在本地@Overridepublic void registerResource(Resource resource) {DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;//ResourceId 与DataSourceProxy映射关系dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);super.registerResource(dataSourceProxy);}@Overridepublic void unregisterResource(Resource resource) {throw new NotSupportYetException("unregister a resource");}/***获取datasource代理对象** @param resourceId the resource id* @return the data source proxy*/public DataSourceProxy get(String resourceId) {return (DataSourceProxy) dataSourceCache.get(resourceId);}@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);}//二阶段 分支回滚@Overridepublic BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {//获取resource下对应DataSource代理对象DataSourceProxy dataSourceProxy = get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException();}try {//一般undo日志存在需要改变的事务数据源下 执行其undo数据UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);} catch (TransactionException te) {StackTraceLogger.info(LOGGER, te,"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;} else {return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}}//返回二阶段回滚return BranchStatus.PhaseTwo_Rollbacked;}@Overridepublic Map<String, Resource> getManagedResources() {return dataSourceCache;}@Overridepublic BranchType getBranchType() {return BranchType.AT;}}
AsyncWorker异步执行分支事务(主要执行RM端的二次提交-删除undo日志):
public class AsyncWorker implements ResourceManagerInbound {private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);//默认private static final int DEFAULT_RESOURCE_SIZE = 16;//避免在高并发下导致存在大量commit请求 一次删除过于庞大 所以定义一个循环下最大的undolog删除数量private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;/*** 2阶段Context 包含commit所需要数据*/private static class Phase2Context {/*** 实例化一个新的2阶段Context** @param branchType      the branchType* @param xid             the xid* @param branchId        the branch id* @param resourceId      the resource id* @param applicationData the application data*/public Phase2Context(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) {this.xid = xid;this.branchId = branchId;this.resourceId = resourceId;this.applicationData = applicationData;this.branchType = branchType;}/*** The Xid.*/String xid;/*** The Branch id.*/long branchId;/*** The Resource id.*/String resourceId;/*** The Application data.*/String applicationData;/*** the branch Type*/BranchType branchType;}//异步提交 buffer数 默认10000private static int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, DEFAULT_CLIENT_ASYNC_COMMIT_BUFFER_LIMIT);//异步提交阻塞队列存储需要提交Phase2Contextprivate static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);//分支提交@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {//新建Phase2Context存储队列if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);}//返回中间状态return BranchStatus.PhaseTwo_Committed;}/*** Init. 在DataSourceManager中被初始化*/public synchronized void init() {LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);//定义调度器ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));//定时调度 没1s调度一次timerExecutor.scheduleAtFixedRate(() -> {try {//执行分支批量提交doBranchCommits();} catch (Throwable e) {LOGGER.info("Failed at async committing ... {}", e.getMessage());}}, 10, 1000 * 1, TimeUnit.MILLISECONDS);}private void doBranchCommits() {if (ASYNC_COMMIT_BUFFER.isEmpty()) {return;}//映射上下文 缓存resourceId 下多次commit请求Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);//从buffer队列中获取 上文提交的Phase2Context数据 直到空队列while (!ASYNC_COMMIT_BUFFER.isEmpty()) {Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();//存在多数据源 多resourceIdList<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>());contextsGroupedByResourceId.add(commitContext);}//单独处理每一个resource 数据源for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {Connection conn = null;DataSourceProxy dataSourceProxy;try {try {//获取resource对应 dataSourceProxy 数据源DataSource代理对象DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get().getResourceManager(BranchType.AT);dataSourceProxy = resourceManager.get(entry.getKey());if (dataSourceProxy == null) {throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());}//获取数据库连接对象conn = dataSourceProxy.getPlainConnection();} catch (SQLException sqle) {LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);continue;}//获取需要执行的commit数据List<Phase2Context> contextsGroupedByResourceId = entry.getValue();//封装 xids 与branchIdsSet<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);for (Phase2Context commitContext : contextsGroupedByResourceId) {xids.add(commitContext.xid);branchIds.add(commitContext.branchId);int maxSize = Math.max(xids.size(), branchIds.size());//避免一次性批量删除过多数据  所以这里每过1000条执行一次该数据源下UndoLog清理if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {try {UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);} catch (Exception ex) {LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);}xids.clear();branchIds.clear();}}if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {return;}try {//说明目前数据存量未达到1000条标准 直接删除undologUndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,branchIds, conn);} catch (Exception ex) {LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);}if (!conn.getAutoCommit()) {conn.commit();}} catch (Throwable e) {LOGGER.error(e.getMessage(), e);try {//执行失败回滚conn.rollback();} catch (SQLException rollbackEx) {LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);}} finally {if (conn != null) {try {conn.close();} catch (SQLException closeEx) {LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);}}}}}@Overridepublic BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {throw new NotSupportYetException();}
}

这里可以看出如果在执行undo log批量删除的时候发送错误,导致数据回滚,从而导致undo日志无法删除。这样存储数据堆积风险。seta是通过TC定时发送undo log删除命令给RM做到这些数据的清除,详细参考RmUndoLogProcessor

 

2.3:处理TC传来的Message 的Processor

根据消息类型 (类型可详细参考常量类MessageType )的不同选择使用不同Processor与执行线程,对于这些Processor的注册与TM的描述一致。RmNettyRemotingClient中init中执行。初始化过程如下所示:

 private void registerProcessor() {// 1.registry rm client handle branch commit processorRmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);// 2.registry rm client handle branch rollback processorRmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);// 3.registry rm handler undo log processorRmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);// 4.registry TC response processorClientOnResponseProcessor onResponseProcessor =new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);// 5.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}

2.3.1:处理commit的RmBranchCommitProcessor

处理分支commit的processor,实际就是接受TM发起的二阶段commit,本质就是删除undo日志

public class RmBranchCommitProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class);private TransactionMessageHandler handler;private RemotingClient remotingClient;public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {this.handler = handler;this.remotingClient = remotingClient;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取TC远程地址String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());//获取返回数据Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm client handle branch commit process:" + msg);}//执行分支commithandleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);}private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {BranchCommitResponse resultMessage;//通过本地branch事务manage执行本地事务resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch commit result:" + resultMessage);}try {//异步向TC汇报本地分支事务执行结果this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);}}
}

具体handler执行由上文 DefaultRMHandler执行。拿到最终分支执行事务执行结果上报给TC,由TC决定整体事务流程。

2.3.2:处理rollback的RmBranchRollbackProcessor

处理分支rollback的processor,实际就是接受TM发起的二阶段rollback,本质就是执行undo日志,达到回滚目的

public class RmBranchRollbackProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchRollbackProcessor.class);private TransactionMessageHandler handler;private RemotingClient remotingClient;public RmBranchRollbackProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {this.handler = handler;this.remotingClient = remotingClient;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取TC地址String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm handle branch rollback process:" + msg);}handleBranchRollback(rpcMessage, remoteAddress, (BranchRollbackRequest) msg);}private void handleBranchRollback(RpcMessage request, String serverAddress, BranchRollbackRequest branchRollbackRequest) {BranchRollbackResponse resultMessage;//本地执行undo 回滚 底层调用链路 handler-》resourceManagerresultMessage = (BranchRollbackResponse) handler.onRequest(branchRollbackRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch rollback result:" + resultMessage);}try {//发送TC分支执行回滚结果this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("send response error: {}", throwable.getMessage(), throwable);}}
}

2.3.3:处理undoLog的RmUndoLogProcessor

处理TC发起的undo log删除命令

/*** 处理TC undo log delete命令* {@link UndoLogDeleteRequest}**/
public class RmUndoLogProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(RmUndoLogProcessor.class);private TransactionMessageHandler handler;public RmUndoLogProcessor(TransactionMessageHandler handler) {this.handler = handler;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//获取tc发送的UndoLogDeleteRequest数据Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm handle undo log process:" + msg);}//执行undo log删除命令handleUndoLogDelete((UndoLogDeleteRequest) msg);}private void handleUndoLogDelete(UndoLogDeleteRequest undoLogDeleteRequest) {try {//底层调用链路 直接在RMHandlerAT中执行handler.onRequest(undoLogDeleteRequest, null);} catch (Exception e) {LOGGER.error("Failed to delete undo log by undoLogDeleteRequest on" + undoLogDeleteRequest.getResourceId());}}
}
RMHandlerAT:处理AT模式下的handler
public class RMHandlerAT extends AbstractRMHandler {private static final Logger LOGGER = LoggerFactory.getLogger(RMHandlerAT.class);private static final int LIMIT_ROWS = 3000;/*** 处理UndoLogDeleteRequest 请求* @param request the request*/@Overridepublic void handle(UndoLogDeleteRequest request) {//获取需要处理的DataSource 代理对象DataSourceManager dataSourceManager = (DataSourceManager)getResourceManager();DataSourceProxy dataSourceProxy = dataSourceManager.get(request.getResourceId());if (dataSourceProxy == null) {LOGGER.warn("Failed to get dataSourceProxy for delete undolog on {}", request.getResourceId());return;}//获取当前时间对应前SaveDays天数(默认7天)Date logCreatedSave = getLogCreated(request.getSaveDays());Connection conn = null;try {conn = dataSourceProxy.getPlainConnection();//记录被删除rowsint deleteRows = 0;do {try {//删除undo表crate时间小于指定天数前3000条数据deleteRows = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).deleteUndoLogByLogCreated(logCreatedSave, LIMIT_ROWS, conn);if (deleteRows > 0 && !conn.getAutoCommit()) {//手动commitconn.commit();}} catch (SQLException exx) {if (deleteRows > 0 && !conn.getAutoCommit()) {conn.rollback();}throw exx;}//每次删除3000 直到数据被删除干净} while (deleteRows == LIMIT_ROWS);} catch (Exception e) {LOGGER.error("Failed to delete expired undo_log, error:{}", e.getMessage(), e);} finally {if (conn != null) {try {conn.close();} catch (SQLException closeEx) {LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);}}}}//获取删除undo条件 时间private Date getLogCreated(int saveDays) {if (saveDays <= 0) {saveDays = UndoLogDeleteRequest.DEFAULT_SAVE_DAYS;}Calendar calendar = Calendar.getInstance();calendar.add(Calendar.DATE, -saveDays);return calendar.getTime();}/*** get AT resource managerDataSourceManager.java** @return*/@Overrideprotected ResourceManager getResourceManager() {return DefaultResourceManager.get().getResourceManager(BranchType.AT);}@Overridepublic BranchType getBranchType() {return BranchType.AT;}}

这样就可以保证RM中undo表的体积不会因为异步删除的原因导致体量变大

2.3.4:处理RM的response消息的ClientOnResponseProcessor

与TM中的ClientOnResponseProcessor功能一致

public class ClientOnResponseProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(ClientOnResponseProcessor.class);/*** 缓存message Id 与merge消息之间映射关系 由AbstractNettyRemotingClient存储*/private Map<Integer, MergeMessage> mergeMsgMap;/*** 缓存每一条message Id(如果是merge中是其中每一条消息) 与MessageFuture映射关系 由AbstractNettyRemoting存储*/private ConcurrentMap<Integer, MessageFuture> futures;/*** To handle the received RPC message on upper level.**/private TransactionMessageHandler transactionMessageHandler;public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap,ConcurrentHashMap<Integer, MessageFuture> futures,TransactionMessageHandler transactionMessageHandler) {this.mergeMsgMap = mergeMsgMap;this.futures = futures;this.transactionMessageHandler = transactionMessageHandler;}@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//判断是否是聚合发送消息if (rpcMessage.getBody() instanceof MergeResultMessage) {//获取结果MergeResultMessage results = (MergeResultMessage) rpcMessage.getBody();//移除缓存MergeResultMessage集合MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(rpcMessage.getId());for (int i = 0; i < mergeMessage.msgs.size(); i++) {//处理每一条数据 结果写入future中int msgId = mergeMessage.msgIds.get(i);MessageFuture future = futures.remove(msgId);if (future == null) {if (LOGGER.isInfoEnabled()) {LOGGER.info("msg: {} is not found in futures.", msgId);}} else {//写回结果 发起请求方阻塞等待结果future.setResultMessage(results.getMsgs()[i]);}}} else {//非聚合单条消息MessageFuture messageFuture = futures.remove(rpcMessage.getId());if (messageFuture != null) {messageFuture.setResultMessage(rpcMessage.getBody());} else {if (rpcMessage.getBody() instanceof AbstractResultMessage) {if (transactionMessageHandler != null) {transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);}}}}}
}

2.3.5:处理RM的heartbeat消息的ClientHeartbeatProcessor

处理TC心跳检测的processor

public class ClientHeartbeatProcessor implements RemotingProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(ClientHeartbeatProcessor.class);@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {//接受TC PONG请求if (rpcMessage.getBody() == HeartbeatMessage.PONG) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("received PONG from {}", ctx.channel().remoteAddress());}}}
}

AT模式下整体执行流程图

 

查看全文
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

相关文章

  1. Java中的匿名对象、内部类、匿名内部类、代码块

    一、匿名对象 Java中的匿名对象指的是在创建对象的时候只有创建对象的语句&#xff0c;但是没有创建出来的对象的地址赋值给某个变量&#xff0c;这样的对象叫做匿名对象。 例如&#xff1a; 有如下的一个Person类 public class Person{//姓名private String name;//提供gett…...

    2024/4/1 16:38:31
  2. 我的CNN图像分类学习之路(1)

    我的CNN图像分类学习之路&#xff08;1&#xff09; CNN学习之路 这是我注册CSDN以来的第一篇学习记录&#xff0c;仅当作自己学习之路上的笔记使用&#xff0c;当然如果某些地方可供大家参考使用那将使我倍感荣幸。不多说&#xff0c;开始记录&#xff01;&#xff01; Ale…...

    2024/4/1 16:38:31
  3. reid思路参考

    1、目标跟踪 ReId https://zhuanlan.zhihu.com/p/88925638 1、#Re-ID# FANet&#xff1a;用于行人重识别的端到端前景识别网络 《An End-to-End Foreground-Aware Network for Person Re-Identification》 注&#xff1a;FANet 性能优于HPM和IANetIANet等网络&#xff0c;其在…...

    2024/4/17 7:38:08
  4. 坑1--C/C++ 字符串输入

    C/C 字符串输入#输入函数scanf和cin在输入字符串时遇空格便会停止用char s[100];gets(s);会报错。用 getchar()循环输入但getchar()会读取回车&#xff0c;要把回车消掉#include<iostream> #include<string.h> using namespace std; int main() {char c;char s[100…...

    2024/4/1 16:38:27
  5. markdown实现顺序图

    效果查看 用户注册 mermaid sequenceDiagram 用户 ->> 注册界面: 输入基本信息 注册界面 ->> 注册界面:简单的 判断用户信息 Note right of 注册界面: 填写是否完整&#xff1f; 注册界面 --x用户: 若不合规范重新填写 注册界面 ->> 用户信息类: 请求写入用…...

    2024/5/5 5:41:19
  6. idea 如何上传项目到github

    1.注册github账号 2.IDEA 新建一个maven项目 3.file-->setting-->version control-->github 账号密码登录 4.vcs-->import into version control -->share project on github 5.点击上图share之后弹出github login对话框&#xff0c;登录 6.成功 踩坑&#x…...

    2024/4/15 4:31:10
  7. Python的学习笔记案例4--52周存钱挑战1.0

    52周存钱挑战52周存钱法,即52周阶梯式存钱法,是国际上非常流行的存钱方法。 按照52周存钱法,存钱的人必须在一年52周内,每周递存10元 例子: 第一周存10元,第二周存20元,第三周存30元,一直到52周存520元。 这样一年下来会有多少钱? 10+20+30+40+50+...+520 = 13780比较…...

    2024/4/21 12:40:13
  8. Python的学习笔记案例4--52周存钱挑战2.0

    52周存钱挑战1.0,只是简单把每一周的存钱输出,没有将每周的存钱记录下来,随便拿出一周的存钱数是不可能的。那么下面的2.0就是解决这个问题:记录每周的存钱数新的知识点:列表 列表(list)是有序的元素集合,类似字符串 可通过索引访问单个元素,如l[2],l[-1] 可通过区间…...

    2024/4/1 20:16:46
  9. cb13.10 检查索引的使用情况

    场景&#xff1a; 开始&#xff0c;在某些列上添加了索引&#xff0c;但过了一段时间&#xff0c;某些变化&#xff0c;如应用程序发生了变化&#xff0c;不再需要该索引。如何找出那些未使用的索引&#xff1f; 方法&#xff1a;从慢查询日志获取查询&#xff0c;对查询执行e…...

    2024/4/1 20:16:46
  10. 巧用Python高阶函数,秒提代码逼格

    Python以简单易学&#xff0c;语法简洁易懂&#xff0c;编码优美规范的特点&#xff0c;受到很多程序员和普通人的喜爱&#xff0c;相信和我一样&#xff0c;有很多人在学过Python之后就对其爱不释手了吧&#xff0c;今天给大家分享几个常用的高阶函数&#xff0c;瞬间提升代码…...

    2024/4/6 8:29:58
  11. 普中V2 51单片机 点阵 音乐播放器 蜂鸣器 仿真 proteus

    &#xff08;1&#xff09;无源蜂鸣器4首音乐 &#xff08;2&#xff09;按键1 开始 暂停 &#xff08;3&#xff09;按键2 下一曲 &#xff08;4&#xff09;按键3 上一首 &#xff08;5&#xff09;按键4 加速播放 &#xff08;6&#xff09;歌曲编号1234&#xff0c;放哪一首…...

    2024/4/12 12:11:45
  12. 数据结构与算法专题汇总(六)队列的leetcode题:滑动窗口最大值,二叉树平均值,栈实现队列,队列实现栈

    1.滑动窗口最大值 给定一个数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回滑动窗口中的最大值。 解题思路&#xff1a; 用双端队列解决&#xff0c;窗口移动时…...

    2024/4/25 11:04:21
  13. LeetCode 买卖股票问题总结(动态规划)

    2020年9月15日 周二 天气晴 【不悲叹过去&#xff0c;不荒废现在&#xff0c;不惧怕未来】 本文目录1. 买卖股票的最佳时机&#xff08;k1&#xff09;1.1 DP优化&#xff08;状态压缩&#xff09;1.2 贪心2. 买卖股票的最佳时机 II&#xff08;k infinity&#xff09;2.1 DP优…...

    2024/4/26 7:19:40
  14. 阿里字体图标之Symbol用法

    第一步&#xff1a;下载阿里字体图标Symbol文件包并解压。 第二步&#xff1a;把压缩后的文件全部放入自己的项目文件中。 第1步&#xff1a;引入项目下面生成的 symbol 代码&#xff1a; <script src"./iconfont.js"></script>第2步&#xff1a;加入…...

    2024/5/3 21:28:58
  15. 索引的原理

    数据在磁盘上是以块的形式存储的。为确保对磁盘操作的原子性&#xff0c;访问数据的时候会一并访问所有数据块。磁盘上的这些数据块与链表类似&#xff0c;即它们都包含一个数据段和一个指针&#xff0c;指针指向下一个节点&#xff08;数据块&#xff09;的内存地址&#xff0…...

    2024/4/12 6:20:07
  16. Web程序设计(一)

    在一个探索期&#xff0c;迷茫过、思考过、未曾为此真正努力过的自己&#xff0c;想把学到的短暂知识封印在这里&#xff0c;所以本系列有关Web方面的内容不会涉及太多理论的知识&#xff0c;只是将自己学到的、理解的、实践的内容做一个简单的整理和总结。 一、前言 1.ASP 与…...

    2024/4/25 4:18:36
  17. Python的学习笔记案例4--52周存钱挑战3.0

    前面使用了while循环,还需要特意定义计数的变量,现在不需要计数的变量--i,直接使用for 循环,不需要指定计数的变量。使用for循环语句可以循环遍历整个序列的内容for <x> in <list1>:<bady> 循环变量x在每次循环时,被赋值成对应的元素内容 与while循环的…...

    2024/4/1 20:16:38
  18. 这不是R语言的第二篇博客

    #apply m<-matrix(1:18,nrow3) m apply(m,1,max) apply(m,2,min) #cbind()加列 rbind()加行 m<-matrix(1:18,nrow3) m col.v<-matrix(19:24,nrow3) col.v cbind(m,col.v) m<-matrix(1:18,nrow3) m col.v<-matrix(19:24,nrow1) col.v rbind(m,col.v) #数组 tes…...

    2024/4/26 19:10:09
  19. Hive的安装

    1.首先需安装好mysql service mysql start 2.cd hive mkdir warehouse 3.vi /etc/profile(配制环境变量) export HIVE_HOME export PATH hive --version检验环境变量是否配好 4.cd /hive/conf mv hive-env.sh(改名成这个) (1)vi hive-env.sh HADOOP_HOME实际上的路径 expo…...

    2024/4/21 9:24:28
  20. 2020二级锅炉司炉模拟考试及二级锅炉司炉模拟考试题库

    题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2020二级锅炉司炉模拟考试及二级锅炉司炉模拟考试题库&#xff0c;包含二级锅炉司炉模拟考试答案解析及二级锅炉司炉模拟考试题库练习。由安全生产模拟考试一点通公众号结合国家二级锅炉司炉考试最新大纲及二级锅炉司…...

    2024/4/17 14:19:34

最新文章

  1. 【Qt问题】VS2019 Qt win32项目如何添加x64编译方式

    解决办法&#xff1a; 注意改为x64版本以后&#xff0c;要记得在项目属性里&#xff0c;修改Qt Settings、对应的链接include、lib等 参考文章 VS2019 Qt win32项目如何添加x64编译方式_vs2019没有x64-CSDN博客 有用的知识又增加了~...

    2024/5/5 6:10:11
  2. 梯度消失和梯度爆炸的一些处理方法

    在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言&#xff0c;在此感激不尽。 权重和梯度的更新公式如下&#xff1a; w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...

    2024/3/20 10:50:27
  3. AI小程序的创业方向:深度思考与逻辑引领

    随着人工智能技术的快速发展&#xff0c;AI小程序逐渐成为创业的新热点。在这个充满机遇与挑战的时代&#xff0c;我们有必要深入探讨AI小程序的创业方向&#xff0c;以把握未来的发展趋势。 一、目标市场定位 首先&#xff0c;我们要明确目标市场。针对不同的用户需求&#x…...

    2024/5/4 1:53:15
  4. 【C++】类和对象①(什么是面向对象 | 类的定义 | 类的访问限定符及封装 | 类的作用域和实例化 | 类对象的存储方式 | this指针)

    目录 前言 什么是面向对象&#xff1f; 类的定义 类的访问限定符及封装 访问限定符 封装 类的作用域 类的实例化 类对象的存储方式 this指针 结语 前言 最早的C版本&#xff08;C with classes&#xff09;中&#xff0c;最先加上的就是类的机制&#xff0c;它构成…...

    2024/5/1 13:18:37
  5. 416. 分割等和子集问题(动态规划)

    题目 题解 class Solution:def canPartition(self, nums: List[int]) -> bool:# badcaseif not nums:return True# 不能被2整除if sum(nums) % 2 ! 0:return False# 状态定义&#xff1a;dp[i][j]表示当背包容量为j&#xff0c;用前i个物品是否正好可以将背包填满&#xff…...

    2024/5/4 12:05:22
  6. 【Java】ExcelWriter自适应宽度工具类(支持中文)

    工具类 import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.CellType; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet;/*** Excel工具类** author xiaoming* date 2023/11/17 10:40*/ public class ExcelUti…...

    2024/5/4 11:23:32
  7. Spring cloud负载均衡@LoadBalanced LoadBalancerClient

    LoadBalance vs Ribbon 由于Spring cloud2020之后移除了Ribbon&#xff0c;直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件&#xff0c;我们讨论Spring负载均衡以Spring Cloud2020之后版本为主&#xff0c;学习Spring Cloud LoadBalance&#xff0c;暂不讨论Ribbon…...

    2024/5/4 14:46:16
  8. TSINGSEE青犀AI智能分析+视频监控工业园区周界安全防范方案

    一、背景需求分析 在工业产业园、化工园或生产制造园区中&#xff0c;周界防范意义重大&#xff0c;对园区的安全起到重要的作用。常规的安防方式是采用人员巡查&#xff0c;人力投入成本大而且效率低。周界一旦被破坏或入侵&#xff0c;会影响园区人员和资产安全&#xff0c;…...

    2024/5/4 23:54:44
  9. VB.net WebBrowser网页元素抓取分析方法

    在用WebBrowser编程实现网页操作自动化时&#xff0c;常要分析网页Html&#xff0c;例如网页在加载数据时&#xff0c;常会显示“系统处理中&#xff0c;请稍候..”&#xff0c;我们需要在数据加载完成后才能继续下一步操作&#xff0c;如何抓取这个信息的网页html元素变化&…...

    2024/5/4 12:10:13
  10. 【Objective-C】Objective-C汇总

    方法定义 参考&#xff1a;https://www.yiibai.com/objective_c/objective_c_functions.html Objective-C编程语言中方法定义的一般形式如下 - (return_type) method_name:( argumentType1 )argumentName1 joiningArgument2:( argumentType2 )argumentName2 ... joiningArgu…...

    2024/5/4 23:54:49
  11. 【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】

    &#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格…...

    2024/5/4 23:54:44
  12. 【ES6.0】- 扩展运算符(...)

    【ES6.0】- 扩展运算符... 文章目录 【ES6.0】- 扩展运算符...一、概述二、拷贝数组对象三、合并操作四、参数传递五、数组去重六、字符串转字符数组七、NodeList转数组八、解构变量九、打印日志十、总结 一、概述 **扩展运算符(...)**允许一个表达式在期望多个参数&#xff0…...

    2024/5/4 14:46:12
  13. 摩根看好的前智能硬件头部品牌双11交易数据极度异常!——是模式创新还是饮鸩止渴?

    文 | 螳螂观察 作者 | 李燃 双11狂欢已落下帷幕&#xff0c;各大品牌纷纷晒出优异的成绩单&#xff0c;摩根士丹利投资的智能硬件头部品牌凯迪仕也不例外。然而有爆料称&#xff0c;在自媒体平台发布霸榜各大榜单喜讯的凯迪仕智能锁&#xff0c;多个平台数据都表现出极度异常…...

    2024/5/4 14:46:11
  14. Go语言常用命令详解(二)

    文章目录 前言常用命令go bug示例参数说明 go doc示例参数说明 go env示例 go fix示例 go fmt示例 go generate示例 总结写在最后 前言 接着上一篇继续介绍Go语言的常用命令 常用命令 以下是一些常用的Go命令&#xff0c;这些命令可以帮助您在Go开发中进行编译、测试、运行和…...

    2024/5/4 14:46:11
  15. 用欧拉路径判断图同构推出reverse合法性:1116T4

    http://cplusoj.com/d/senior/p/SS231116D 假设我们要把 a a a 变成 b b b&#xff0c;我们在 a i a_i ai​ 和 a i 1 a_{i1} ai1​ 之间连边&#xff0c; b b b 同理&#xff0c;则 a a a 能变成 b b b 的充要条件是两图 A , B A,B A,B 同构。 必要性显然&#xff0…...

    2024/5/5 2:25:33
  16. 【NGINX--1】基础知识

    1、在 Debian/Ubuntu 上安装 NGINX 在 Debian 或 Ubuntu 机器上安装 NGINX 开源版。 更新已配置源的软件包信息&#xff0c;并安装一些有助于配置官方 NGINX 软件包仓库的软件包&#xff1a; apt-get update apt install -y curl gnupg2 ca-certificates lsb-release debian-…...

    2024/5/4 21:24:42
  17. Hive默认分割符、存储格式与数据压缩

    目录 1、Hive默认分割符2、Hive存储格式3、Hive数据压缩 1、Hive默认分割符 Hive创建表时指定的行受限&#xff08;ROW FORMAT&#xff09;配置标准HQL为&#xff1a; ... ROW FORMAT DELIMITED FIELDS TERMINATED BY \u0001 COLLECTION ITEMS TERMINATED BY , MAP KEYS TERMI…...

    2024/5/4 12:39:12
  18. 【论文阅读】MAG:一种用于航天器遥测数据中有效异常检测的新方法

    文章目录 摘要1 引言2 问题描述3 拟议框架4 所提出方法的细节A.数据预处理B.变量相关分析C.MAG模型D.异常分数 5 实验A.数据集和性能指标B.实验设置与平台C.结果和比较 6 结论 摘要 异常检测是保证航天器稳定性的关键。在航天器运行过程中&#xff0c;传感器和控制器产生大量周…...

    2024/5/4 13:16:06
  19. --max-old-space-size=8192报错

    vue项目运行时&#xff0c;如果经常运行慢&#xff0c;崩溃停止服务&#xff0c;报如下错误 FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory 因为在 Node 中&#xff0c;通过JavaScript使用内存时只能使用部分内存&#xff08;64位系统&…...

    2024/5/4 16:48:41
  20. 基于深度学习的恶意软件检测

    恶意软件是指恶意软件犯罪者用来感染个人计算机或整个组织的网络的软件。 它利用目标系统漏洞&#xff0c;例如可以被劫持的合法软件&#xff08;例如浏览器或 Web 应用程序插件&#xff09;中的错误。 恶意软件渗透可能会造成灾难性的后果&#xff0c;包括数据被盗、勒索或网…...

    2024/5/4 14:46:05
  21. JS原型对象prototype

    让我简单的为大家介绍一下原型对象prototype吧&#xff01; 使用原型实现方法共享 1.构造函数通过原型分配的函数是所有对象所 共享的。 2.JavaScript 规定&#xff0c;每一个构造函数都有一个 prototype 属性&#xff0c;指向另一个对象&#xff0c;所以我们也称为原型对象…...

    2024/5/5 3:37:58
  22. C++中只能有一个实例的单例类

    C中只能有一个实例的单例类 前面讨论的 President 类很不错&#xff0c;但存在一个缺陷&#xff1a;无法禁止通过实例化多个对象来创建多名总统&#xff1a; President One, Two, Three; 由于复制构造函数是私有的&#xff0c;其中每个对象都是不可复制的&#xff0c;但您的目…...

    2024/5/4 23:54:30
  23. python django 小程序图书借阅源码

    开发工具&#xff1a; PyCharm&#xff0c;mysql5.7&#xff0c;微信开发者工具 技术说明&#xff1a; python django html 小程序 功能介绍&#xff1a; 用户端&#xff1a; 登录注册&#xff08;含授权登录&#xff09; 首页显示搜索图书&#xff0c;轮播图&#xff0…...

    2024/5/4 9:07:39
  24. 电子学会C/C++编程等级考试2022年03月(一级)真题解析

    C/C++等级考试(1~8级)全部真题・点这里 第1题:双精度浮点数的输入输出 输入一个双精度浮点数,保留8位小数,输出这个浮点数。 时间限制:1000 内存限制:65536输入 只有一行,一个双精度浮点数。输出 一行,保留8位小数的浮点数。样例输入 3.1415926535798932样例输出 3.1…...

    2024/5/4 14:46:02
  25. 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...

    解析如下&#xff1a;1、长按电脑电源键直至关机&#xff0c;然后再按一次电源健重启电脑&#xff0c;按F8健进入安全模式2、安全模式下进入Windows系统桌面后&#xff0c;按住“winR”打开运行窗口&#xff0c;输入“services.msc”打开服务设置3、在服务界面&#xff0c;选中…...

    2022/11/19 21:17:18
  26. 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。

    %读入6幅图像&#xff08;每一幅图像的大小是564*564&#xff09; f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...

    2022/11/19 21:17:16
  27. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...

    win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面&#xff0c;在等待界面中我们需要等待操作结束才能关机&#xff0c;虽然这比较麻烦&#xff0c;但是对系统进行配置和升级…...

    2022/11/19 21:17:15
  28. 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...

    有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows&#xff0c;请勿关闭计算机”的提示&#xff0c;要过很久才能进入系统&#xff0c;有的用户甚至几个小时也无法进入&#xff0c;下面就教大家这个问题的解决方法。第一种方法&#xff1a;我们首先在左下角的“开始…...

    2022/11/19 21:17:14
  29. win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...

    置信有很多用户都跟小编一样遇到过这样的问题&#xff0c;电脑时发现开机屏幕显现“正在配置Windows Update&#xff0c;请勿关机”(如下图所示)&#xff0c;而且还需求等大约5分钟才干进入系统。这是怎样回事呢&#xff1f;一切都是正常操作的&#xff0c;为什么开时机呈现“正…...

    2022/11/19 21:17:13
  30. 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...

    Win7系统开机启动时总是出现“配置Windows请勿关机”的提示&#xff0c;没过几秒后电脑自动重启&#xff0c;每次开机都这样无法进入系统&#xff0c;此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一&#xff1a;开机按下F8&#xff0c;在出现的Windows高级启动选…...

    2022/11/19 21:17:12
  31. 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...

    有不少windows10系统用户反映说碰到这样一个情况&#xff0c;就是电脑提示正在准备windows请勿关闭计算机&#xff0c;碰到这样的问题该怎么解决呢&#xff0c;现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法&#xff1a;1、2、依次…...

    2022/11/19 21:17:11
  32. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...

    今天和大家分享一下win7系统重装了Win7旗舰版系统后&#xff0c;每次关机的时候桌面上都会显示一个“配置Windows Update的界面&#xff0c;提示请勿关闭计算机”&#xff0c;每次停留好几分钟才能正常关机&#xff0c;导致什么情况引起的呢&#xff1f;出现配置Windows Update…...

    2022/11/19 21:17:10
  33. 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...

    只能是等着&#xff0c;别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚&#xff0c;只能是考虑备份数据后重装系统了。解决来方案一&#xff1a;管理员运行cmd&#xff1a;net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...

    2022/11/19 21:17:09
  34. 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?

    原标题&#xff1a;电脑提示“配置Windows Update请勿关闭计算机”怎么办&#xff1f;win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢&#xff1f;一般的方…...

    2022/11/19 21:17:08
  35. 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...

    关机提示 windows7 正在配置windows 请勿关闭计算机 &#xff0c;然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;关机提示 windows7 正在配…...

    2022/11/19 21:17:05
  36. 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...

    钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...

    2022/11/19 21:17:05
  37. 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...

    前几天班里有位学生电脑(windows 7系统)出问题了&#xff0c;具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面&#xff0c;长时间没反应&#xff0c;无法进入系统。这个问题原来帮其他同学也解决过&#xff0c;网上搜了不少资料&#x…...

    2022/11/19 21:17:04
  38. 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...

    本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法&#xff0c;并在最后教给你1种保护系统安全的好方法&#xff0c;一起来看看&#xff01;电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中&#xff0c;添加了1个新功能在“磁…...

    2022/11/19 21:17:03
  39. 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...

    许多用户在长期不使用电脑的时候&#xff0c;开启电脑发现电脑显示&#xff1a;配置windows更新失败&#xff0c;正在还原更改&#xff0c;请勿关闭计算机。。.这要怎么办呢&#xff1f;下面小编就带着大家一起看看吧&#xff01;如果能够正常进入系统&#xff0c;建议您暂时移…...

    2022/11/19 21:17:02
  40. 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...

    配置windows update失败 还原更改 请勿关闭计算机&#xff0c;电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;配置windows update失败 还原更改 请勿关闭计算机&#x…...

    2022/11/19 21:17:01
  41. 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...

    不知道大家有没有遇到过这样的一个问题&#xff0c;就是我们的win7系统在关机的时候&#xff0c;总是喜欢显示“准备配置windows&#xff0c;请勿关机”这样的一个页面&#xff0c;没有什么大碍&#xff0c;但是如果一直等着的话就要两个小时甚至更久都关不了机&#xff0c;非常…...

    2022/11/19 21:17:00
  42. 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...

    当电脑出现正在准备配置windows请勿关闭计算机时&#xff0c;一般是您正对windows进行升级&#xff0c;但是这个要是长时间没有反应&#xff0c;我们不能再傻等下去了。可能是电脑出了别的问题了&#xff0c;来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...

    2022/11/19 21:16:59
  43. 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...

    我们使用电脑的过程中有时会遇到这种情况&#xff0c;当我们打开电脑之后&#xff0c;发现一直停留在一个界面&#xff1a;“配置Windows Update失败&#xff0c;还原更改请勿关闭计算机”&#xff0c;等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢&#xff0…...

    2022/11/19 21:16:58
  44. 如何在iPhone上关闭“请勿打扰”

    Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...

    2022/11/19 21:16:57