Spring具体事务处理器的实现
1.Spring的事务处理中,通用的事务处理流程框架是由抽象事务管理器AbstractPlatformTransactionManager来提供的,而具体的底层事务处理实现,由PlatformTransactionManager的具体实现类来实现,如 DataSourceTransactionManager 、JtaTransactionManager和 HibernateTransactionManager等。我们以Spring中最常用的DataSourceTransactionManager和 HibernateTransactionManager为例,来分析具体事务处理器的底层实现事务创建、提交和回滚的处理操作。
2.AbstractPlatformTransactionManager抽象事物处理器:
上一篇博客中,我们已经分析了抽象事物管理器AbstractPlatformTransactionManager的源码,了解它实现了PlatformTransactionManager平台事务管理器接口,提供了一系列设计好的事务模板方法,如事务提交、回滚等,这些模板方法的具体实现由具体的事务处理器来提供。
3.DataSourceTransactionManager事务处理器的实现:
DataSourceTransactionManager数据源事务处理器是针对JDBC连接提供的事务处理器实现,即数据源事务处理器把数据库Connection连接和当前线程进行绑定,通过直接调用数据库连接Connection的提交和回滚方法实现事务的提供和回滚处理。其源码如下:
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { //注入数据源 private DataSource dataSource; //数据源事务处理器默认构造方法,创建一个数据源事务处理器实例,并设置允许嵌套事务 public DataSourceTransactionManager() { setNestedTransactionAllowed(true); } //根据给定数据源,创建一个数据源事务处理器实例 public DataSourceTransactionManager(DataSource dataSource) { this(); setDataSource(dataSource); afterPropertiesSet(); } //设置数据源 public void setDataSource(DataSource dataSource) { if (dataSource instanceof TransactionAwareDataSourceProxy) { //如果数据源是一个事务包装数据源代理,则获取事务包装代理的目标数据源 this.dataSource = ((TransactionAwareDataSourceProxy) dataSource).getTargetDataSource(); } else { this.dataSource = dataSource; } } //获取数据源 public DataSource getDataSource() { return this.dataSource; } //数据源事务处理器对象构造方法的回调函数 public void afterPropertiesSet() { if (getDataSource() == null) { throw new IllegalArgumentException("Property ‘dataSource‘ is required"); } } public Object getResourceFactory() { return getDataSource(); } //创建事务,对数据库而言,是由Connection来完成事务工作的。该方法把数据库的//Connection对象放到一个ConnectionHolder对象中,然后封装到一个 //DataSourceTransactionObject对象中 protected Object doGetTransaction() { //创建数据源事务对象 DataSourceTransactionObject txObject = new DataSourceTransactionObject(); //设置数据源事务对象对嵌套事务使用保存点 txObject.setSavepointAllowed(isNestedTransactionAllowed()); //从事务管理容器中获取存放数据库Connection的对象 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); txObject.setConnectionHolder(conHolder, false); return txObject; } //判断是否已经存在事务 protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; //根据存放数据库连接的ConnectionHolder的isTransactionActive属性来判断 return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); } //处理事务开始的方法 protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { //如果数据源事务对象的ConnectionHolder为null或者是事务同步的 if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { //获取当前数据源的数据库连接 Connection newCon = this.dataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } //为数据源事务对象设置ConnectionHolder txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } //设置数据源事务对象的事务同步 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); //获取数据源事务对象的数据库连接 con = txObject.getConnectionHolder().getConnection(); //根据数据连接和事务属性,获取数据库连接的事务隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); //为数据源事务对象设置事务隔离级别 txObject.setPreviousIsolationLevel(previousIsolationLevel); //如果数据库连接设置了自动事务提交属性,则关闭自动提交 if (con.getAutoCommit()) { //保存数据库连接设置的自动连接到数据源事务对象中 txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } //设置数据库连接自动事务提交属性为false,即禁止自动事务提交 con.setAutoCommit(false); } //激活当前数据源事务对象的事务配置 txObject.getConnectionHolder().setTransactionActive(true); //获取事务配置的超时时长 int timeout = determineTimeout(definition); //如果事务配置的超时时长不等于事务的默认超时时长 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { //数据源事务对象设置超时时长 txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } //把当前数据库Connection和线程绑定 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Exception ex) { DataSourceUtils.releaseConnection(con, this.dataSource); throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } } //事务挂起 protected Object doSuspend(Object transaction) { //获取事务对象 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; //将事务对象中的ConnectionHolders设置为null txObject.setConnectionHolder(null); ConnectionHolder conHolder = (ConnectionHolder) //解除事务对象和当前线程的绑定 TransactionSynchronizationManager.unbindResource(this.dataSource); return conHolder; } //事务恢复 protected void doResume(Object transaction, Object suspendedResources) { //获取已暂停事务的ConnectionHolder ConnectionHolder conHolder = (ConnectionHolder) suspendedResources; //重新将事务对象和当前线程绑定 TransactionSynchronizationManager.bindResource(this.dataSource, conHolder); } //事务提交 protected void doCommit(DefaultTransactionStatus status) { //获取事务对象 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); //通过事务对象获取数据库连接 Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { //使用数据库连接手动进行事务提交 con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } } //事务回滚 protected void doRollback(DefaultTransactionStatus status) { //获取事务对象 DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); //通过事务对象获取数据库连接 Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { //通过调用数据库连接的回滚方法完成事务回滚操作 con.rollback(); } catch (SQLException ex) { throw new TransactionSystemException("Could not roll back JDBC transaction", ex); } } //设置回滚 protected void doSetRollbackOnly(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only"); } txObject.setRollbackOnly(); } //操作完成之后清除操作 protected void doCleanupAfterCompletion(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; //移除当前线程绑定的ConnectionHolder if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(this.dataSource); } Connection con = txObject.getConnectionHolder().getConnection(); try { //如果事务对象保存了自动事务提交属性,则设置数据库连接的自动事务提交属性 if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } //事务结束后重置数据库连接 DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel()); } catch (Throwable ex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } //如果事务对象中有新的ConnectionHolder if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); } //释放数据库连接 DataSourceUtils.releaseConnection(con, this.dataSource); } //清除事务对象的ConnectionHolder txObject.getConnectionHolder().clear(); } //数据源事务对象,内部类 private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport { //是否有新的ConnectionHolder private boolean newConnectionHolder; //是否保存自动提交 private boolean mustRestoreAutoCommit; //设置ConnectionHolder public void setConnectionHolder(ConnectionHolder connectionHolder, boolean newConnectionHolder) { //为父类JdbcTransactionObjectSupport设置ConnectionHolder super.setConnectionHolder(connectionHolder); this.newConnectionHolder = newConnectionHolder; } public boolean isNewConnectionHolder() { return this.newConnectionHolder; } //调用父类JdbcTransactionObjectSupport的相关方法,查询收费存在事务 public boolean hasTransaction() { return (getConnectionHolder() != null && getConnectionHolder().isTransactionActive()); } //设置是否保存自动提交 public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) { this.mustRestoreAutoCommit = mustRestoreAutoCommit; } public boolean isMustRestoreAutoCommit() { return this.mustRestoreAutoCommit; } //设置数据库连接在操作失败是,是否只回滚处理 public void setRollbackOnly() { getConnectionHolder().setRollbackOnly(); } public boolean isRollbackOnly() { return getConnectionHolder().isRollbackOnly(); } } }
通过上述对数据源事务处理器的源码分析,我们看到,事务的提交、回滚等操作是通过直接调用数据库连接Connection的提交和回滚方法实现的,由于自动事务提交对应用程序性能影响很大,因此在进行事务提交时,我们首先禁止数据库连接的自动事务提交,事务提供操作通过手动实现。
4.HibernateTransactionManager事务处理器的实现:
相对于数据源的事务处理器来说,Hibernate的事务处理器相对要复杂一些,它是通过对Hibernate的会话Session的管理来完成事务处理实现的。Hibernate事务处理器的事务处理相关源码如下:
public class HibernateTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, BeanFactoryAware, InitializingBean { …… //获取Hibernate事务 protected Object doGetTransaction() { //创建Hibernate事务对象 HibernateTransactionObject txObject = new HibernateTransactionObject(); //根据是否允许嵌套事务设置事务对象是否允许保存点 txObject.setSavepointAllowed(isNestedTransactionAllowed()); //从线程中获取SessionHolder,SessionHolder是在事务开始时与线程绑定的。 SessionHolder sessionHolder = (SessionHolder) TransactionSynchronizationManager.getResource(getSessionFactory()); //如果获取到的SessionHolder不为null if (sessionHolder != null) { if (logger.isDebugEnabled()) { logger.debug("Found thread-bound Session [" + SessionFactoryUtils.toString(sessionHolder.getSession()) + "] for Hibernate transaction"); } //把获取到的SessionHolder设置到Hibernate事务对象中 txObject.setSessionHolder(sessionHolder); } //如果当前Hibernate事务处理器有被管理的Hibernate Session else if (this.hibernateManagedSession) { try { //获取当前的Hibernate Session Session session = getSessionFactory().getCurrentSession(); if (logger.isDebugEnabled()) { logger.debug("Found Hibernate-managed Session [" + SessionFactoryUtils.toString(session) + "] for Spring-managed transaction"); } //设置Hibernate事务对象已经存在指定的Session txObject.setExistingSession(session); } catch (HibernateException ex) { throw new DataAccessResourceFailureException( "Could not obtain Hibernate-managed Session for Spring-managed transaction", ex); } } //如果获取到的数据源不为null if (getDataSource() != null) { //将获取到的数据源和当前线程绑定 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(getDataSource()); txObject.setConnectionHolder(conHolder); } return txObject; } //是否已存在事务 protected boolean isExistingTransaction(Object transaction) { HibernateTransactionObject txObject = (HibernateTransactionObject) transaction; //根据事务对象是否存在Spring管理的事务,或者通过判断是否存在Hibernate //Session或者事务对象中有被Hibernate管理的事务 return (txObject.hasSpringManagedTransaction() || (this.hibernateManagedSession && txObject.hasHibernateManagedTransaction())); } //处理事务开始 protected void doBegin(Object transaction, TransactionDefinition definition) { //获取事务对象 HibernateTransactionObject txObject = (HibernateTransactionObject) transaction; //如果事务对象有ConnectionHolder,且事务对象的数据库连接不是事务同步的 if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) { throw new IllegalTransactionStateException( "Pre-bound JDBC Connection found! HibernateTransactionManager does not support " + "running within DataSourceTransactionManager if told to manage the DataSource itself. " + "It is recommended to use a single HibernateTransactionManager for all transactions " + "on a single DataSource, no matter whether Hibernate or JDBC access."); } Session session = null; try { //如果事务对象的SessionHolder为null,或者事务对象Hibernate //Session是事务同步的 if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) { //获取Hibernate事务处理器中的实体拦截器 Interceptor entityInterceptor = getEntityInterceptor(); //获取Hibernate Session,如果实体拦截器不为null,则打开指定 //实体拦截器的Session,如果实体拦截器为null,则打开新Session Session newSession = (entityInterceptor != null ? getSessionFactory().openSession(entityInterceptor) : getSessionFactory().openSession()); if (logger.isDebugEnabled()) { logger.debug("Opened new Session [" + SessionFactoryUtils.toString(newSession) + "] for Hibernate transaction"); } //将获取的Hibernate Session设置到事务对象中 txObject.setSession(newSession); } //如果Hibernate事务处理器中的SessionHolder不为null,则 //获取SessionHolder中已有的Hibernate Session session = txObject.getSessionHolder().getSession(); //允许为JDBC连接改变事务设置 if (this.prepareConnection && isSameConnectionForEntireSession(session)) { if (logger.isDebugEnabled()) { logger.debug( "Preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]"); } //获取Session连接 Connection con = session.connection(); //获取事务的隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); //设置事务对象的事务隔离级别 txObject.setPreviousIsolationLevel(previousIsolationLevel); } //不允许为JDBC连接改成事务设置 else { //如果事务隔离级别不是默认事务隔离级别 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { throw new InvalidIsolationLevelException( "HibernateTransactionManager is not allowed to support custom isolation levels: " + "make sure that its ‘prepareConnection‘ flag is on (the default) and that the " + "Hibernate connection release mode is set to ‘on_close‘ (SpringTransactionFactory‘s default). " + "Make sure that your LocalSessionFactoryBean actually uses SpringTransactionFactory: Your " + "Hibernate properties should *not* include a ‘hibernate.transaction.factory_class‘ property!"); } if (logger.isDebugEnabled()) { logger.debug( "Not preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]"); } } //如果事务是只读,且事务对象是新的Hibernate Session if (definition.isReadOnly() && txObject.isNewSession()) { //设置Hibernate Session刷新模式为手动 session.setFlushMode(FlushMode.MANUAL); } //如果事务是非只读的,且事务对象不是新Hibernate Session if (!definition.isReadOnly() && !txObject.isNewSession()) { //或者Hibernate的刷新模式 FlushMode flushMode = session.getFlushMode(); //设置Session的刷新模式 if (flushMode.lessThan(FlushMode.COMMIT)) { session.setFlushMode(FlushMode.AUTO); //为事务对象设置刷新模式 txObject.getSessionHolder().setPreviousFlushMode(flushMode); } } Transaction hibTx; //获取事务超时时长 int timeout = determineTimeout(definition); //如果事务配置的超时时长不是事务默认超时时长 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { //获取Hibernate Session事务 hibTx = session.getTransaction(); //为事务对象设置超时时长 hibTx.setTimeout(timeout); //开启事务 hibTx.begin(); } //如果事务配置的超时时长是默认超时时长 else { //通过Hibernate Session直接开启事务 hibTx = session.beginTransaction(); } //把事务设置到事务对象的SessionHolder中,并且线程绑定 txObject.getSessionHolder().setTransaction(hibTx); //如果数据源不为null,即设置了数据源 if (getDataSource() != null) { //使用Hibernate Session打开数据库连接 Connection con = session.connection(); //创建ConnectionHolder ConnectionHolder conHolder = new ConnectionHolder(con); //设置超时时长 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { conHolder.setTimeoutInSeconds(timeout); } if (logger.isDebugEnabled()) { logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]"); } //将数据源和JDBC ConnectionHolder绑定到当前线程 TransactionSynchronizationManager.bindResource(getDataSource(), conHolder); //将创建的JDBC ConnectionHolder设置到事务对象中 txObject.setConnectionHolder(conHolder); } //如果事务对象中的SessionHolder是新的 if (txObject.isNewSessionHolder()) { //当SessionHolder和当前线程绑定起来 TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder()); } //设置事务对象中的SessionHolder是事务同步的 txObject.getSessionHolder().setSynchronizedWithTransaction(true); } //事务开启过程中异常处理 catch (Exception ex) { if (txObject.isNewSession()) { try { //如果Session的事务上激活的,回滚Session的事务 if (session.getTransaction().isActive()) { session.getTransaction().rollback(); } } catch (Throwable ex2) { logger.debug("Could not rollback Session after failed transaction begin", ex); } finally { //关闭Session SessionFactoryUtils.closeSession(session); } } throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex); } } //事务挂起 protected Object doSuspend(Object transaction) { HibernateTransactionObject txObject = (HibernateTransactionObject) transaction; //把当前的SessionHolder从线程中和事务对象中释放 txObject.setSessionHolder(null); //解析SessionHolder和线程的绑定 SessionHolder sessionHolder = (SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory()); txObject.setConnectionHolder(null); ConnectionHolder connectionHolder = null; //解除数据源和线程的绑定 if (getDataSource() != null) { connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource()); } return new SuspendedResourcesHolder(sessionHolder, connectionHolder); } //事务恢复 protected void doResume(Object transaction, Object suspendedResources) { SuspendedResourcesHolder resourcesHolder = (SuspendedResourcesHolder) suspendedResources; //如果事务管理器中有SessionFactory if (TransactionSynchronizationManager.hasResource(getSessionFactory())) { //解除SessionFactory和当前线程的绑定 TransactionSynchronizationManager.unbindResource(getSessionFactory()); } //如果事务管理器中没有SessionFactory,则将Session和当前线程绑定 TransactionSynchronizationManager.bindResource(getSessionFactory(), resourcesHolder.getSessionHolder()); if (getDataSource() != null) { TransactionSynchronizationManager.bindResource(getDataSource(), resourcesHolder.getConnectionHolder()); } } //准备提交 protected void prepareForCommit(DefaultTransactionStatus status) { //如果事务配置为FlushBeforeCommit,并且是新事务 if (this.earlyFlushBeforeCommit && status.isNewTransaction()) { //获取事务对象 HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction(); //回去事务对象中的Session Session session = txObject.getSessionHolder().getSession(); //如果Session的刷新模式不低于COMMIT if (!session.getFlushMode().lessThan(FlushMode.COMMIT)) { logger.debug("Performing an early flush for Hibernate transaction"); try { //刷新Session session.flush(); } catch (HibernateException ex) { throw convertHibernateAccessException(ex); } finally { //把Session的刷新模式设置为MANUAL session.setFlushMode(FlushMode.MANUAL); } } } } //提交处理 protected void doCommit(DefaultTransactionStatus status) { //获取当前的Hibernate事务对象 HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Committing Hibernate transaction on Session [" +SessionFactoryUtils.toString(txObject.getSessionHolder().getSession()) + "]"); } try { //通过Hibernate事务完成提交 txObject.getSessionHolder().getTransaction().commit(); } catch (org.hibernate.TransactionException ex) { throw new TransactionSystemException("Could not commit Hibernate transaction", ex); } catch (HibernateException ex) { throw convertHibernateAccessException(ex); } } //回滚处理 protected void doRollback(DefaultTransactionStatus status) { //获取Hibernate事务对象 HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction(); if (status.isDebug()) { logger.debug("Rolling back Hibernate transaction on Session ["+SessionFactoryUtils.toString(txObject.getSessionHolder().getSession()) + "]"); } try { //通过Hibernate事务执行回滚操作 txObject.getSessionHolder().getTransaction().rollback(); } catch (org.hibernate.TransactionException ex) { throw new TransactionSystemException("Could not roll back Hibernate transaction", ex); } catch (HibernateException ex) { throw convertHibernateAccessException(ex); } finally { if (!txObject.isNewSession() && !this.hibernateManagedSession) { //清除事务对象中的Hibernate Session txObject.getSessionHolder().getSession().clear(); } } } …… }
通过上面对Hibernate事务处理器的分析,我们看到真正执行提交、回滚等事务操作的还是Hibernate Transaction事务对象,这与单独直接使用Hibernate没有什么区别,只是Spring将其做了通用封装,更加方便使用。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。