zg.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "context"
  8. "database/sql/driver"
  9. "reflect"
  10. )
  11. type rwFilter struct {
  12. }
  13. //DmDriver
  14. func (rwf *rwFilter) DmDriverOpen(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnection, error) {
  15. return filterChain.DmDriverOpen(d, dsn)
  16. }
  17. func (rwf *rwFilter) DmDriverOpenConnector(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnector, error) {
  18. return filterChain.DmDriverOpenConnector(d, dsn)
  19. }
  20. //DmConnector
  21. func (rwf *rwFilter) DmConnectorConnect(filterChain *filterChain, c *DmConnector, ctx context.Context) (*DmConnection, error) {
  22. return RWUtil.connect(c, ctx)
  23. }
  24. func (rwf *rwFilter) DmConnectorDriver(filterChain *filterChain, c *DmConnector) *DmDriver {
  25. return filterChain.DmConnectorDriver(c)
  26. }
  27. //DmConnection
  28. func (rwf *rwFilter) DmConnectionBegin(filterChain *filterChain, c *DmConnection) (*DmConnection, error) {
  29. if RWUtil.isStandbyAlive(c) {
  30. _, err := c.rwInfo.connStandby.begin()
  31. if err != nil {
  32. RWUtil.afterExceptionOnStandby(c, err)
  33. }
  34. }
  35. return filterChain.DmConnectionBegin(c)
  36. }
  37. func (rwf *rwFilter) DmConnectionBeginTx(filterChain *filterChain, c *DmConnection, ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
  38. if RWUtil.isStandbyAlive(c) {
  39. _, err := c.rwInfo.connStandby.beginTx(ctx, opts)
  40. if err != nil {
  41. RWUtil.afterExceptionOnStandby(c, err)
  42. }
  43. }
  44. return filterChain.DmConnectionBeginTx(c, ctx, opts)
  45. }
  46. func (rwf *rwFilter) DmConnectionCommit(filterChain *filterChain, c *DmConnection) error {
  47. if RWUtil.isStandbyAlive(c) {
  48. err := c.rwInfo.connStandby.commit()
  49. if err != nil {
  50. RWUtil.afterExceptionOnStandby(c, err)
  51. }
  52. }
  53. return filterChain.DmConnectionCommit(c)
  54. }
  55. func (rwf *rwFilter) DmConnectionRollback(filterChain *filterChain, c *DmConnection) error {
  56. if RWUtil.isStandbyAlive(c) {
  57. err := c.rwInfo.connStandby.rollback()
  58. if err != nil {
  59. RWUtil.afterExceptionOnStandby(c, err)
  60. }
  61. }
  62. return filterChain.DmConnectionRollback(c)
  63. }
  64. func (rwf *rwFilter) DmConnectionClose(filterChain *filterChain, c *DmConnection) error {
  65. if RWUtil.isStandbyAlive(c) {
  66. err := c.rwInfo.connStandby.close()
  67. if err != nil {
  68. RWUtil.afterExceptionOnStandby(c, err)
  69. }
  70. }
  71. return filterChain.DmConnectionClose(c)
  72. }
  73. func (rwf *rwFilter) DmConnectionPing(filterChain *filterChain, c *DmConnection, ctx context.Context) error {
  74. return filterChain.DmConnectionPing(c, ctx)
  75. }
  76. func (rwf *rwFilter) DmConnectionExec(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmResult, error) {
  77. ret, err := RWUtil.executeByConn(c, query, func() (interface{}, error) {
  78. return c.rwInfo.connCurrent.exec(query, args)
  79. }, func(otherConn *DmConnection) (interface{}, error) {
  80. return otherConn.exec(query, args)
  81. })
  82. if err != nil {
  83. return nil, err
  84. }
  85. return ret.(*DmResult), nil
  86. }
  87. func (rwf *rwFilter) DmConnectionExecContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
  88. ret, err := RWUtil.executeByConn(c, query, func() (interface{}, error) {
  89. return c.rwInfo.connCurrent.execContext(ctx, query, args)
  90. }, func(otherConn *DmConnection) (interface{}, error) {
  91. return otherConn.execContext(ctx, query, args)
  92. })
  93. if err != nil {
  94. return nil, err
  95. }
  96. return ret.(*DmResult), nil
  97. }
  98. func (rwf *rwFilter) DmConnectionQuery(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmRows, error) {
  99. ret, err := RWUtil.executeByConn(c, query, func() (interface{}, error) {
  100. return c.rwInfo.connCurrent.query(query, args)
  101. }, func(otherConn *DmConnection) (interface{}, error) {
  102. return otherConn.query(query, args)
  103. })
  104. if err != nil {
  105. return nil, err
  106. }
  107. return ret.(*DmRows), nil
  108. }
  109. func (rwf *rwFilter) DmConnectionQueryContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
  110. ret, err := RWUtil.executeByConn(c, query, func() (interface{}, error) {
  111. return c.rwInfo.connCurrent.queryContext(ctx, query, args)
  112. }, func(otherConn *DmConnection) (interface{}, error) {
  113. return otherConn.queryContext(ctx, query, args)
  114. })
  115. if err != nil {
  116. return nil, err
  117. }
  118. return ret.(*DmRows), nil
  119. }
  120. func (rwf *rwFilter) DmConnectionPrepare(filterChain *filterChain, c *DmConnection, query string) (*DmStatement, error) {
  121. stmt, err := c.prepare(query)
  122. if err != nil {
  123. return nil, err
  124. }
  125. stmt.rwInfo.stmtCurrent = stmt
  126. stmt.rwInfo.readOnly = RWUtil.checkReadonlyByStmt(stmt)
  127. if RWUtil.isCreateStandbyStmt(stmt) {
  128. stmt.rwInfo.stmtStandby, err = c.rwInfo.connStandby.prepare(query)
  129. if err == nil {
  130. stmt.rwInfo.stmtCurrent = stmt.rwInfo.stmtStandby
  131. } else {
  132. RWUtil.afterExceptionOnStandby(c, err)
  133. }
  134. }
  135. return stmt, nil
  136. }
  137. func (rwf *rwFilter) DmConnectionPrepareContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string) (*DmStatement, error) {
  138. stmt, err := c.prepareContext(ctx, query)
  139. if err != nil {
  140. return nil, err
  141. }
  142. stmt.rwInfo.stmtCurrent = stmt
  143. stmt.rwInfo.readOnly = RWUtil.checkReadonlyByStmt(stmt)
  144. if RWUtil.isCreateStandbyStmt(stmt) {
  145. stmt.rwInfo.stmtStandby, err = c.rwInfo.connStandby.prepareContext(ctx, query)
  146. if err == nil {
  147. stmt.rwInfo.stmtCurrent = stmt.rwInfo.stmtStandby
  148. } else {
  149. RWUtil.afterExceptionOnStandby(c, err)
  150. }
  151. }
  152. return stmt, nil
  153. }
  154. func (rwf *rwFilter) DmConnectionResetSession(filterChain *filterChain, c *DmConnection, ctx context.Context) error {
  155. if RWUtil.isStandbyAlive(c) {
  156. err := c.rwInfo.connStandby.resetSession(ctx)
  157. if err != nil {
  158. RWUtil.afterExceptionOnStandby(c, err)
  159. }
  160. }
  161. return filterChain.DmConnectionResetSession(c, ctx)
  162. }
  163. func (rwf *rwFilter) DmConnectionCheckNamedValue(filterChain *filterChain, c *DmConnection, nv *driver.NamedValue) error {
  164. return filterChain.DmConnectionCheckNamedValue(c, nv)
  165. }
  166. //DmStatement
  167. func (rwf *rwFilter) DmStatementClose(filterChain *filterChain, s *DmStatement) error {
  168. if RWUtil.isStandbyStatementValid(s) {
  169. err := s.rwInfo.stmtStandby.close()
  170. if err != nil {
  171. RWUtil.afterExceptionOnStandby(s.dmConn, err)
  172. }
  173. }
  174. return filterChain.DmStatementClose(s)
  175. }
  176. func (rwf *rwFilter) DmStatementNumInput(filterChain *filterChain, s *DmStatement) int {
  177. return filterChain.DmStatementNumInput(s)
  178. }
  179. func (rwf *rwFilter) DmStatementExec(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmResult, error) {
  180. ret, err := RWUtil.executeByStmt(s, func() (interface{}, error) {
  181. return s.rwInfo.stmtCurrent.exec(args)
  182. }, func(otherStmt *DmStatement) (interface{}, error) {
  183. return otherStmt.exec(args)
  184. })
  185. if err != nil {
  186. return nil, err
  187. }
  188. return ret.(*DmResult), nil
  189. }
  190. func (rwf *rwFilter) DmStatementExecContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmResult, error) {
  191. ret, err := RWUtil.executeByStmt(s, func() (interface{}, error) {
  192. return s.rwInfo.stmtCurrent.execContext(ctx, args)
  193. }, func(otherStmt *DmStatement) (interface{}, error) {
  194. return otherStmt.execContext(ctx, args)
  195. })
  196. if err != nil {
  197. return nil, err
  198. }
  199. return ret.(*DmResult), nil
  200. }
  201. func (rwf *rwFilter) DmStatementQuery(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmRows, error) {
  202. ret, err := RWUtil.executeByStmt(s, func() (interface{}, error) {
  203. return s.rwInfo.stmtCurrent.query(args)
  204. }, func(otherStmt *DmStatement) (interface{}, error) {
  205. return otherStmt.query(args)
  206. })
  207. if err != nil {
  208. return nil, err
  209. }
  210. return ret.(*DmRows), nil
  211. }
  212. func (rwf *rwFilter) DmStatementQueryContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmRows, error) {
  213. ret, err := RWUtil.executeByStmt(s, func() (interface{}, error) {
  214. return s.rwInfo.stmtCurrent.queryContext(ctx, args)
  215. }, func(otherStmt *DmStatement) (interface{}, error) {
  216. return otherStmt.queryContext(ctx, args)
  217. })
  218. if err != nil {
  219. return nil, err
  220. }
  221. return ret.(*DmRows), nil
  222. }
  223. func (rwf *rwFilter) DmStatementCheckNamedValue(filterChain *filterChain, s *DmStatement, nv *driver.NamedValue) error {
  224. return filterChain.DmStatementCheckNamedValue(s, nv)
  225. }
  226. //DmResult
  227. func (rwf *rwFilter) DmResultLastInsertId(filterChain *filterChain, r *DmResult) (int64, error) {
  228. return filterChain.DmResultLastInsertId(r)
  229. }
  230. func (rwf *rwFilter) DmResultRowsAffected(filterChain *filterChain, r *DmResult) (int64, error) {
  231. return filterChain.DmResultRowsAffected(r)
  232. }
  233. //DmRows
  234. func (rwf *rwFilter) DmRowsColumns(filterChain *filterChain, r *DmRows) []string {
  235. return filterChain.DmRowsColumns(r)
  236. }
  237. func (rwf *rwFilter) DmRowsClose(filterChain *filterChain, r *DmRows) error {
  238. return filterChain.DmRowsClose(r)
  239. }
  240. func (rwf *rwFilter) DmRowsNext(filterChain *filterChain, r *DmRows, dest []driver.Value) error {
  241. return filterChain.DmRowsNext(r, dest)
  242. }
  243. func (rwf *rwFilter) DmRowsHasNextResultSet(filterChain *filterChain, r *DmRows) bool {
  244. return filterChain.DmRowsHasNextResultSet(r)
  245. }
  246. func (rwf *rwFilter) DmRowsNextResultSet(filterChain *filterChain, r *DmRows) error {
  247. return filterChain.DmRowsNextResultSet(r)
  248. }
  249. func (rwf *rwFilter) DmRowsColumnTypeScanType(filterChain *filterChain, r *DmRows, index int) reflect.Type {
  250. return filterChain.DmRowsColumnTypeScanType(r, index)
  251. }
  252. func (rwf *rwFilter) DmRowsColumnTypeDatabaseTypeName(filterChain *filterChain, r *DmRows, index int) string {
  253. return filterChain.DmRowsColumnTypeDatabaseTypeName(r, index)
  254. }
  255. func (rwf *rwFilter) DmRowsColumnTypeLength(filterChain *filterChain, r *DmRows, index int) (length int64, ok bool) {
  256. return filterChain.DmRowsColumnTypeLength(r, index)
  257. }
  258. func (rwf *rwFilter) DmRowsColumnTypeNullable(filterChain *filterChain, r *DmRows, index int) (nullable, ok bool) {
  259. return filterChain.DmRowsColumnTypeNullable(r, index)
  260. }
  261. func (rwf *rwFilter) DmRowsColumnTypePrecisionScale(filterChain *filterChain, r *DmRows, index int) (precision, scale int64, ok bool) {
  262. return filterChain.DmRowsColumnTypePrecisionScale(r, index)
  263. }