zx.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "context"
  8. "database/sql"
  9. "database/sql/driver"
  10. "errors"
  11. "io"
  12. "regexp"
  13. "strings"
  14. "time"
  15. "gitee.com/chunanyong/dm/util"
  16. )
  17. const (
  18. SQL_SELECT_STANDBY = "select distinct mailIni.inst_name, mailIni.INST_IP, mailIni.INST_PORT, archIni.arch_status " +
  19. "from v$arch_status archIni " +
  20. "left join (select * from V$DM_MAL_INI) mailIni on archIni.arch_dest = mailIni.inst_name " +
  21. "left join V$MAL_LINK_STATUS on CTL_LINK_STATUS = 'CONNECTED' AND DATA_LINK_STATUS = 'CONNECTED' " +
  22. "where archIni.arch_type in ('TIMELY', 'REALTIME') AND archIni.arch_status = 'VALID'"
  23. SQL_SELECT_STANDBY2 = "select distinct " +
  24. "mailIni.mal_inst_name, mailIni.mal_INST_HOST, mailIni.mal_INST_PORT, archIni.arch_status " +
  25. "from v$arch_status archIni " + "left join (select * from V$DM_MAL_INI) mailIni " +
  26. "on archIni.arch_dest = mailIni.mal_inst_name " + "left join V$MAL_LINK_STATUS " +
  27. "on CTL_LINK_STATUS = 'CONNECTED' AND DATA_LINK_STATUS = 'CONNECTED' " +
  28. "where archIni.arch_type in ('TIMELY', 'REALTIME') AND archIni.arch_status = 'VALID'"
  29. )
  30. type rwUtil struct {
  31. }
  32. var RWUtil = rwUtil{}
  33. func (RWUtil rwUtil) connect(c *DmConnector, ctx context.Context) (*DmConnection, error) {
  34. c.loginMode = LOGIN_MODE_PRIMARY_ONLY
  35. connection, err := c.connect(ctx)
  36. if err != nil {
  37. return nil, err
  38. }
  39. connection.rwInfo.rwCounter = getRwCounterInstance(connection, connection.StandbyCount)
  40. err = RWUtil.connectStandby(connection)
  41. return connection, err
  42. }
  43. func (RWUtil rwUtil) reconnect(connection *DmConnection) error {
  44. if connection.rwInfo == nil {
  45. return nil
  46. }
  47. RWUtil.removeStandby(connection)
  48. err := connection.reconnect()
  49. if err != nil {
  50. return err
  51. }
  52. connection.rwInfo.cleanup()
  53. connection.rwInfo.rwCounter = getRwCounterInstance(connection, connection.StandbyCount)
  54. err = RWUtil.connectStandby(connection)
  55. return err
  56. }
  57. func (RWUtil rwUtil) recoverStandby(connection *DmConnection) error {
  58. if connection.closed.IsSet() || RWUtil.isStandbyAlive(connection) {
  59. return nil
  60. }
  61. ts := time.Now().UnixNano() / 1000000
  62. freq := int64(connection.dmConnector.rwStandbyRecoverTime)
  63. if freq <= 0 || ts-connection.rwInfo.tryRecoverTs < freq {
  64. return nil
  65. }
  66. err := RWUtil.connectStandby(connection)
  67. connection.rwInfo.tryRecoverTs = ts
  68. return err
  69. }
  70. func (RWUtil rwUtil) connectStandby(connection *DmConnection) error {
  71. var err error
  72. db, err := RWUtil.chooseValidStandby(connection)
  73. if err != nil {
  74. return err
  75. }
  76. if db == nil {
  77. return nil
  78. }
  79. standbyConnectorValue := *connection.dmConnector
  80. standbyConnector := &standbyConnectorValue
  81. standbyConnector.host = db.host
  82. standbyConnector.port = db.port
  83. standbyConnector.rwStandby = true
  84. standbyConnector.group = nil
  85. standbyConnector.loginMode = LOGIN_MODE_STANDBY_ONLY
  86. standbyConnector.switchTimes = 0
  87. connection.rwInfo.connStandby, err = standbyConnector.connectSingle(context.Background())
  88. if err != nil {
  89. return err
  90. }
  91. if connection.rwInfo.connStandby.SvrMode != SERVER_MODE_STANDBY || connection.rwInfo.connStandby.SvrStat != SERVER_STATUS_OPEN {
  92. RWUtil.removeStandby(connection)
  93. }
  94. return nil
  95. }
  96. func (RWUtil rwUtil) chooseValidStandby(connection *DmConnection) (*ep, error) {
  97. stmt, rs, err := connection.driverQuery(SQL_SELECT_STANDBY2)
  98. if err != nil {
  99. stmt, rs, err = connection.driverQuery(SQL_SELECT_STANDBY)
  100. }
  101. defer func() {
  102. if rs != nil {
  103. rs.close()
  104. }
  105. if stmt != nil {
  106. stmt.close()
  107. }
  108. }()
  109. if err == nil {
  110. count := int32(rs.CurrentRows.getRowCount())
  111. if count > 0 {
  112. connection.rwInfo.rwCounter = getRwCounterInstance(connection, count)
  113. i := int32(0)
  114. rowIndex := connection.rwInfo.rwCounter.random(count)
  115. dest := make([]driver.Value, 3)
  116. for err := rs.next(dest); err != io.EOF; err = rs.next(dest) {
  117. if i == rowIndex {
  118. ep := newEP(dest[1].(string), dest[2].(int32))
  119. return ep, nil
  120. }
  121. i++
  122. }
  123. }
  124. }
  125. if err != nil {
  126. return nil, errors.New("choose valid standby error!" + err.Error())
  127. }
  128. return nil, nil
  129. }
  130. func (RWUtil rwUtil) afterExceptionOnStandby(connection *DmConnection, e error) {
  131. if e.(*DmError).ErrCode == ECGO_COMMUNITION_ERROR.ErrCode {
  132. RWUtil.removeStandby(connection)
  133. }
  134. }
  135. func (RWUtil rwUtil) removeStandby(connection *DmConnection) {
  136. if connection.rwInfo.connStandby != nil {
  137. connection.rwInfo.connStandby.close()
  138. connection.rwInfo.connStandby = nil
  139. }
  140. }
  141. func (RWUtil rwUtil) isCreateStandbyStmt(stmt *DmStatement) bool {
  142. return stmt != nil && stmt.rwInfo.readOnly && RWUtil.isStandbyAlive(stmt.dmConn)
  143. }
  144. func (RWUtil rwUtil) executeByConn(conn *DmConnection, query string, execute1 func() (interface{}, error), execute2 func(otherConn *DmConnection) (interface{}, error)) (interface{}, error) {
  145. if err := RWUtil.recoverStandby(conn); err != nil {
  146. return nil, err
  147. }
  148. RWUtil.distributeSqlByConn(conn, query)
  149. turnToPrimary := false
  150. ret, err := execute1()
  151. if err != nil {
  152. if conn.rwInfo.connCurrent == conn.rwInfo.connStandby {
  153. RWUtil.afterExceptionOnStandby(conn, err)
  154. turnToPrimary = true
  155. } else {
  156. return nil, err
  157. }
  158. }
  159. curConn := conn.rwInfo.connCurrent
  160. var otherConn *DmConnection
  161. if curConn != conn {
  162. otherConn = conn
  163. } else {
  164. otherConn = conn.rwInfo.connStandby
  165. }
  166. switch curConn.lastExecInfo.retSqlType {
  167. case Dm_build_1063, Dm_build_1064, Dm_build_1068, Dm_build_1075, Dm_build_1074, Dm_build_1066:
  168. {
  169. if otherConn != nil {
  170. execute2(otherConn)
  171. }
  172. }
  173. case Dm_build_1073:
  174. {
  175. sqlhead := regexp.MustCompile("[ (]").Split(strings.TrimSpace(query), 2)[0]
  176. if util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_PARA_VALUE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_SESSION_READONLY") {
  177. if otherConn != nil {
  178. execute2(otherConn)
  179. }
  180. }
  181. }
  182. case Dm_build_1072:
  183. {
  184. if conn.dmConnector.rwHA && curConn == conn.rwInfo.connStandby &&
  185. (curConn.lastExecInfo.rsDatas == nil || len(curConn.lastExecInfo.rsDatas) == 0) {
  186. turnToPrimary = true
  187. }
  188. }
  189. }
  190. if turnToPrimary {
  191. conn.rwInfo.toPrimary()
  192. conn.rwInfo.connCurrent = conn
  193. return execute2(conn)
  194. }
  195. return ret, nil
  196. }
  197. func (RWUtil rwUtil) executeByStmt(stmt *DmStatement, execute1 func() (interface{}, error), execute2 func(otherStmt *DmStatement) (interface{}, error)) (interface{}, error) {
  198. orgStmt := stmt.rwInfo.stmtCurrent
  199. query := stmt.nativeSql
  200. if err := RWUtil.recoverStandby(stmt.dmConn); err != nil {
  201. return nil, err
  202. }
  203. RWUtil.distributeSqlByStmt(stmt)
  204. if orgStmt != stmt.rwInfo.stmtCurrent {
  205. RWUtil.copyStatement(orgStmt, stmt.rwInfo.stmtCurrent)
  206. stmt.rwInfo.stmtCurrent.nativeSql = orgStmt.nativeSql
  207. }
  208. turnToPrimary := false
  209. ret, err := execute1()
  210. if err != nil {
  211. if stmt.rwInfo.stmtCurrent == stmt.rwInfo.stmtStandby {
  212. RWUtil.afterExceptionOnStandby(stmt.dmConn, err)
  213. turnToPrimary = true
  214. } else {
  215. return nil, err
  216. }
  217. }
  218. curStmt := stmt.rwInfo.stmtCurrent
  219. var otherStmt *DmStatement
  220. if curStmt != stmt {
  221. otherStmt = stmt
  222. } else {
  223. otherStmt = stmt.rwInfo.stmtStandby
  224. }
  225. switch curStmt.execInfo.retSqlType {
  226. case Dm_build_1063, Dm_build_1064, Dm_build_1068, Dm_build_1075, Dm_build_1074, Dm_build_1066:
  227. {
  228. if otherStmt != nil {
  229. RWUtil.copyStatement(curStmt, otherStmt)
  230. execute2(otherStmt)
  231. }
  232. }
  233. case Dm_build_1073:
  234. {
  235. var tmpsql string
  236. if query != "" {
  237. tmpsql = strings.TrimSpace(query)
  238. } else if stmt.nativeSql != "" {
  239. tmpsql = strings.TrimSpace(stmt.nativeSql)
  240. } else {
  241. tmpsql = ""
  242. }
  243. sqlhead := regexp.MustCompile("[ (]").Split(tmpsql, 2)[0]
  244. if util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_PARA_VALUE") || util.StringUtil.EqualsIgnoreCase(sqlhead, "SP_SET_SESSION_READONLY") {
  245. if otherStmt != nil {
  246. RWUtil.copyStatement(curStmt, otherStmt)
  247. execute2(otherStmt)
  248. }
  249. }
  250. }
  251. case Dm_build_1072:
  252. {
  253. if stmt.dmConn.dmConnector.rwHA && curStmt == stmt.rwInfo.stmtStandby &&
  254. (curStmt.execInfo.rsDatas == nil || len(curStmt.execInfo.rsDatas) == 0) {
  255. turnToPrimary = true
  256. }
  257. }
  258. }
  259. if turnToPrimary {
  260. stmt.dmConn.rwInfo.toPrimary()
  261. stmt.rwInfo.stmtCurrent = stmt
  262. RWUtil.copyStatement(stmt.rwInfo.stmtStandby, stmt)
  263. return execute2(stmt)
  264. }
  265. return ret, nil
  266. }
  267. func (RWUtil rwUtil) checkReadonlyByConn(conn *DmConnection, sql string) bool {
  268. readonly := true
  269. if sql != "" && !conn.dmConnector.rwIgnoreSql {
  270. tmpsql := strings.TrimSpace(sql)
  271. sqlhead := strings.SplitN(tmpsql, " ", 2)[0]
  272. if util.StringUtil.EqualsIgnoreCase(sqlhead, "INSERT") ||
  273. util.StringUtil.EqualsIgnoreCase(sqlhead, "UPDATE") ||
  274. util.StringUtil.EqualsIgnoreCase(sqlhead, "DELETE") ||
  275. util.StringUtil.EqualsIgnoreCase(sqlhead, "CREATE") ||
  276. util.StringUtil.EqualsIgnoreCase(sqlhead, "TRUNCATE") ||
  277. util.StringUtil.EqualsIgnoreCase(sqlhead, "DROP") ||
  278. util.StringUtil.EqualsIgnoreCase(sqlhead, "ALTER") {
  279. readonly = false
  280. } else {
  281. readonly = true
  282. }
  283. }
  284. return readonly
  285. }
  286. func (RWUtil rwUtil) checkReadonlyByStmt(stmt *DmStatement) bool {
  287. return RWUtil.checkReadonlyByConn(stmt.dmConn, stmt.nativeSql)
  288. }
  289. func (RWUtil rwUtil) distributeSqlByConn(conn *DmConnection, query string) RWSiteEnum {
  290. var dest RWSiteEnum
  291. if !RWUtil.isStandbyAlive(conn) {
  292. dest = conn.rwInfo.toPrimary()
  293. } else if !RWUtil.checkReadonlyByConn(conn, query) {
  294. dest = conn.rwInfo.toPrimary()
  295. } else if (conn.rwInfo.distribute == PRIMARY && !conn.trxFinish) ||
  296. (conn.rwInfo.distribute == STANDBY && !conn.rwInfo.connStandby.trxFinish) {
  297. dest = conn.rwInfo.distribute
  298. } else if conn.IsoLevel != int32(sql.LevelSerializable) {
  299. dest = conn.rwInfo.toAny()
  300. } else {
  301. dest = conn.rwInfo.toPrimary()
  302. }
  303. if dest == PRIMARY {
  304. conn.rwInfo.connCurrent = conn
  305. } else {
  306. conn.rwInfo.connCurrent = conn.rwInfo.connStandby
  307. }
  308. return dest
  309. }
  310. func (RWUtil rwUtil) distributeSqlByStmt(stmt *DmStatement) RWSiteEnum {
  311. var dest RWSiteEnum
  312. if !RWUtil.isStandbyAlive(stmt.dmConn) {
  313. dest = stmt.dmConn.rwInfo.toPrimary()
  314. } else if !RWUtil.checkReadonlyByStmt(stmt) {
  315. dest = stmt.dmConn.rwInfo.toPrimary()
  316. } else if (stmt.dmConn.rwInfo.distribute == PRIMARY && !stmt.dmConn.trxFinish) ||
  317. (stmt.dmConn.rwInfo.distribute == STANDBY && !stmt.dmConn.rwInfo.connStandby.trxFinish) {
  318. dest = stmt.dmConn.rwInfo.distribute
  319. } else if stmt.dmConn.IsoLevel != int32(sql.LevelSerializable) {
  320. dest = stmt.dmConn.rwInfo.toAny()
  321. } else {
  322. dest = stmt.dmConn.rwInfo.toPrimary()
  323. }
  324. if dest == STANDBY && !RWUtil.isStandbyStatementValid(stmt) {
  325. var err error
  326. stmt.rwInfo.stmtStandby, err = stmt.dmConn.rwInfo.connStandby.prepare(stmt.nativeSql)
  327. if err != nil {
  328. dest = stmt.dmConn.rwInfo.toPrimary()
  329. }
  330. }
  331. if dest == PRIMARY {
  332. stmt.rwInfo.stmtCurrent = stmt
  333. } else {
  334. stmt.rwInfo.stmtCurrent = stmt.rwInfo.stmtStandby
  335. }
  336. return dest
  337. }
  338. func (RWUtil rwUtil) isStandbyAlive(connection *DmConnection) bool {
  339. return connection.rwInfo.connStandby != nil && !connection.rwInfo.connStandby.closed.IsSet()
  340. }
  341. func (RWUtil rwUtil) isStandbyStatementValid(statement *DmStatement) bool {
  342. return statement.rwInfo.stmtStandby != nil && !statement.rwInfo.stmtStandby.closed
  343. }
  344. func (RWUtil rwUtil) copyStatement(srcStmt *DmStatement, destStmt *DmStatement) {
  345. destStmt.nativeSql = srcStmt.nativeSql
  346. destStmt.serverParams = srcStmt.serverParams
  347. destStmt.bindParams = srcStmt.bindParams
  348. destStmt.paramCount = srcStmt.paramCount
  349. }