mysql分布式事务01
分布式事务组成
- 资源管理器: 提供访问事务资源的方法,通常一个数据库就是一个资源管理器
- 事务管理器: 协调参与全局事务的各个事务。需要和参与全局事务中的所有资源管理器进行通信。
- 应用程序: 定义事务的边界,指定全局事务中的操作。
提交方式
两段式提交。在第一个阶段,所有参与全局事务的节点都开始准备,告诉事务管理器他们准备提交了;第二阶段,事务管理器告诉资源管理器执行commit或者时rollback,如果任意一个节点显示不能提交,则所有的节点进行回滚。
代码示例:
- MyXid.java
public class MyXid implements Xid{
public int formatId;
public byte gtrid[];
public byte bqual[];
public MyXid() {
}
public MyXid(int formatId, byte gtrid[], byte bqual[]) {
this.formatId = formatId;
this.gtrid = gtrid;
this.bqual = bqual;
}
@Override
public byte[] getBranchQualifier() {
return bqual;
}
@Override
public int getFormatId() {
return formatId;
}
@Override
public byte[] getGlobalTransactionId() {
return gtrid;
}
}
-
Demo1.java
```java public class Demo1 {
/**
-
分布式数据库
-
@param url
-
@param user
-
@param pass
-
@return */ public static MysqlXADataSource getDataSource(String url, String user, String pass) { MysqlXADataSource mysqlXADataSource = new MysqlXADataSource(); mysqlXADataSource.setUrl(url); mysqlXADataSource.setUser(user); mysqlXADataSource.setPassword(pass); return mysqlXADataSource; }
public static void main(String[] args) throws SQLException { String connStr1 = “jdbc:mysql://192.168.200.137:3306/bank_shanghai”; //数据库1 String connStr2 = “jdbc:mysql://127.0.0.1:3306/bank_beijing”; //数据库2
MysqlXADataSource ds1 = getDataSource(connStr1,“root”,“123456”); MysqlXADataSource ds2 = getDataSource(connStr2, “root”, “Gepoint”);
XAConnection xaConnection1 = ds1.getXAConnection(); XAConnection xaConnection2 = ds2.getXAConnection();
XAResource xaResource1 = xaConnection1.getXAResource(); XAResource xaResource2 = xaConnection2.getXAResource();
Connection connection1 = xaConnection1.getConnection(); //获得连接 Connection connection2 = xaConnection2.getConnection(); //获得连接
Statement statement1 = connection1.createStatement(); Statement statement2 = connection2.createStatement();
Xid xid1 = new MyXid(100, new byte[] {0x01},new byte[] {0x02}); Xid xid2 = new MyXid(100, new byte[] {0x11}, new byte[] {0x12});
try { xaResource1.start(xid1, XAResource.TMNOFLAGS); statement1.execute(“UPDATE money SET avail='22292’ WHERE id= 12”); xaResource1.end(xid1, XAResource.TMSUCCESS); xaResource2.start(xid2, XAResource.TMNOFLAGS); statement2.execute(“UPDATE mone SET avail='22293’ WHERE id= 12”);//此处错误,会导致整个全局事务回滚 xaResource2.end(xid2, XAResource.TMSUCCESS); int ret2 = xaResource2.prepare(xid2); //第一阶段 int ret1 = xaResource1.prepare(xid1); if(ret1==XAResource.XA_OK && ret2==XAResource.XA_OK) { xaResource1.commit(xid1, false); //第二阶段 xaResource2.commit(xid2, false); }
} catch (Exception e) { e.printStackTrace(); }
-
}
} ```