mysql分布式事务01

分布式事务组成

  1. 资源管理器: 提供访问事务资源的方法,通常一个数据库就是一个资源管理器
  2. 事务管理器: 协调参与全局事务的各个事务。需要和参与全局事务中的所有资源管理器进行通信。
  3. 应用程序: 定义事务的边界,指定全局事务中的操作。

提交方式

两段式提交。在第一个阶段,所有参与全局事务的节点都开始准备,告诉事务管理器他们准备提交了;第二阶段,事务管理器告诉资源管理器执行commit或者时rollback,如果任意一个节点显示不能提交,则所有的节点进行回滚。

代码示例:

  1. MyXid.java

    public class MyXid implements Xid{
    	
    	public int formatId;
    	public byte gtrid[];
    	public byte bqual[];
    
    	public MyXid() {
    		
    	}
    	
    	public MyXid(int formatId, byte gtrid[], byte bqual[]) {
    		this.formatId = formatId;
    		this.gtrid = gtrid;
    		this.bqual = bqual;
    	}
    	
    	@Override
    	public byte[] getBranchQualifier() {
    		return bqual;
    	}
    
    	@Override
    	public int getFormatId() {
    		return formatId;
    	}
    
    	@Override
    	public byte[] getGlobalTransactionId() {
    		return gtrid;
    	}
    
    }
    
  2. Demo1.java

    public class Demo1 {
    	
    	/**
    	 * 分布式数据库
    	 * @param url
    	 * @param user
    	 * @param pass
    	 * @return
    	 */
    	public static MysqlXADataSource getDataSource(String url, String user, String pass) {
    		MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
    		mysqlXADataSource.setUrl(url);
    		mysqlXADataSource.setUser(user);
    		mysqlXADataSource.setPassword(pass);
    		return mysqlXADataSource;
    	}
    	
    	public static void main(String[] args) throws SQLException {
    		String connStr1 = "jdbc:mysql://192.168.200.137:3306/bank_shanghai";	//数据库1
    		String connStr2 = "jdbc:mysql://127.0.0.1:3306/bank_beijing";			//数据库2
    		
    		MysqlXADataSource ds1 = getDataSource(connStr1,"root","123456");
    		MysqlXADataSource ds2 = getDataSource(connStr2, "root", "Gepoint");
    		
    		XAConnection xaConnection1 = ds1.getXAConnection();
    		XAConnection xaConnection2 = ds2.getXAConnection();
    		
    		XAResource xaResource1 = xaConnection1.getXAResource();
    		XAResource xaResource2 = xaConnection2.getXAResource();
    		
    		Connection connection1 = xaConnection1.getConnection();	//获得连接
    		Connection connection2 = xaConnection2.getConnection();	//获得连接
    		
    		Statement statement1 = connection1.createStatement();
    		Statement statement2 = connection2.createStatement();
    		
    		Xid xid1 = new MyXid(100, new byte[] {0x01},new byte[] {0x02});
    		Xid xid2 = new MyXid(100, new byte[] {0x11}, new byte[] {0x12});
    		
    		try {
    			xaResource1.start(xid1, XAResource.TMNOFLAGS);
    			statement1.execute("UPDATE money SET avail='22292' WHERE id= 12");
    			xaResource1.end(xid1, XAResource.TMSUCCESS);
    			xaResource2.start(xid2, XAResource.TMNOFLAGS);
    			statement2.execute("UPDATE mone SET avail='22293' WHERE id= 12");//此处错误,会导致整个全局事务回滚
    			xaResource2.end(xid2, XAResource.TMSUCCESS);
    			int ret2 = xaResource2.prepare(xid2);			//第一阶段
    			int ret1 = xaResource1.prepare(xid1);
    			if(ret1==XAResource.XA_OK && ret2==XAResource.XA_OK) {
    				xaResource1.commit(xid1, false);			//第二阶段
    				xaResource2.commit(xid2, false);
    			}
    			
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		
    		
    	}
    	
    }