zh.go 17 KB


  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "bytes"
  8. "context"
  9. "database/sql/driver"
  10. "fmt"
  11. "reflect"
  12. "time"
  13. )
  14. type statFilter struct {
  15. }
  16. //DmDriver
  17. func (sf *statFilter) DmDriverOpen(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnection, error) {
  18. conn, err := filterChain.DmDriverOpen(d, dsn)
  19. if err != nil {
  20. return nil, err
  21. }
  22. conn.statInfo.init(conn)
  23. conn.statInfo.setConstructNano()
  24. conn.statInfo.getConnStat().incrementConn()
  25. return conn, nil
  26. }
  27. func (sf *statFilter) DmDriverOpenConnector(filterChain *filterChain, d *DmDriver, dsn string) (*DmConnector, error) {
  28. return filterChain.DmDriverOpenConnector(d, dsn)
  29. }
  30. //DmConnector
  31. func (sf *statFilter) DmConnectorConnect(filterChain *filterChain, c *DmConnector, ctx context.Context) (*DmConnection, error) {
  32. conn, err := filterChain.DmConnectorConnect(c, ctx)
  33. if err != nil {
  34. return nil, err
  35. }
  36. conn.statInfo.init(conn)
  37. conn.statInfo.setConstructNano()
  38. conn.statInfo.getConnStat().incrementConn()
  39. return conn, nil
  40. }
  41. func (sf *statFilter) DmConnectorDriver(filterChain *filterChain, c *DmConnector) *DmDriver {
  42. return filterChain.DmConnectorDriver(c)
  43. }
  44. //DmConnection
  45. func (sf *statFilter) DmConnectionBegin(filterChain *filterChain, c *DmConnection) (*DmConnection, error) {
  46. return filterChain.DmConnectionBegin(c)
  47. }
  48. func (sf *statFilter) DmConnectionBeginTx(filterChain *filterChain, c *DmConnection, ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
  49. return filterChain.DmConnectionBeginTx(c, ctx, opts)
  50. }
  51. func (sf *statFilter) DmConnectionCommit(filterChain *filterChain, c *DmConnection) error {
  52. err := filterChain.DmConnectionCommit(c)
  53. if err != nil {
  54. return err
  55. }
  56. c.statInfo.getConnStat().incrementCommitCount()
  57. return nil
  58. }
  59. func (sf *statFilter) DmConnectionRollback(filterChain *filterChain, c *DmConnection) error {
  60. err := filterChain.DmConnectionRollback(c)
  61. if err != nil {
  62. return err
  63. }
  64. c.statInfo.getConnStat().incrementRollbackCount()
  65. return nil
  66. }
  67. func (sf *statFilter) DmConnectionClose(filterChain *filterChain, c *DmConnection) error {
  68. if !c.closed.IsSet() {
  69. c.statInfo.getConnStat().decrementStmtByActiveStmtCount(int64(getActiveStmtCount(c)))
  70. c.statInfo.getConnStat().decrementConn()
  71. }
  72. return filterChain.DmConnectionClose(c)
  73. }
  74. func (sf *statFilter) DmConnectionPing(filterChain *filterChain, c *DmConnection, ctx context.Context) error {
  75. return c.ping(ctx)
  76. }
  77. func (sf *statFilter) DmConnectionExec(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmResult, error) {
  78. connExecBefore(c, query)
  79. dr, err := filterChain.DmConnectionExec(c, query, args)
  80. if err != nil {
  81. connExecuteErrorAfter(c, args, err)
  82. return nil, err
  83. }
  84. connExecAfter(c, query, args, int(dr.affectedRows))
  85. return dr, nil
  86. }
  87. func (sf *statFilter) DmConnectionExecContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
  88. connExecBefore(c, query)
  89. dr, err := filterChain.DmConnectionExecContext(c, ctx, query, args)
  90. if err != nil {
  91. connExecuteErrorAfter(c, args, err)
  92. return nil, err
  93. }
  94. connExecAfter(c, query, args, int(dr.affectedRows))
  95. return dr, nil
  96. }
  97. func (sf *statFilter) DmConnectionQuery(filterChain *filterChain, c *DmConnection, query string, args []driver.Value) (*DmRows, error) {
  98. connQueryBefore(c, query)
  99. dr, err := filterChain.DmConnectionQuery(c, query, args)
  100. if err != nil {
  101. connExecuteErrorAfter(c, args, err)
  102. return nil, err
  103. }
  104. connQueryAfter(c, query, args, dr)
  105. return dr, nil
  106. }
  107. func (sf *statFilter) DmConnectionQueryContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
  108. connQueryBefore(c, query)
  109. dr, err := filterChain.DmConnectionQueryContext(c, ctx, query, args)
  110. if err != nil {
  111. connExecuteErrorAfter(c, args, err)
  112. return nil, err
  113. }
  114. connQueryAfter(c, query, args, dr)
  115. return dr, nil
  116. }
  117. func (sf *statFilter) DmConnectionPrepare(filterChain *filterChain, c *DmConnection, query string) (*DmStatement, error) {
  118. stmt, err := filterChain.DmConnectionPrepare(c, query)
  119. if err != nil {
  120. return nil, err
  121. }
  122. statementCreateAfter(c, stmt)
  123. return stmt, nil
  124. }
  125. func (sf *statFilter) DmConnectionPrepareContext(filterChain *filterChain, c *DmConnection, ctx context.Context, query string) (*DmStatement, error) {
  126. stmt, err := filterChain.DmConnectionPrepareContext(c, ctx, query)
  127. if err != nil {
  128. return nil, err
  129. }
  130. statementCreateAfter(c, stmt)
  131. return stmt, nil
  132. }
  133. func (sf *statFilter) DmConnectionResetSession(filterChain *filterChain, c *DmConnection, ctx context.Context) error {
  134. return filterChain.DmConnectionResetSession(c, ctx)
  135. }
  136. func (sf *statFilter) DmConnectionCheckNamedValue(filterChain *filterChain, c *DmConnection, nv *driver.NamedValue) error {
  137. return filterChain.DmConnectionCheckNamedValue(c, nv)
  138. }
  139. //DmStatement
  140. func (sf *statFilter) DmStatementClose(filterChain *filterChain, s *DmStatement) error {
  141. if !s.closed {
  142. statementCloseBefore(s)
  143. }
  144. return filterChain.DmStatementClose(s)
  145. }
  146. func (sf *statFilter) DmStatementNumInput(filterChain *filterChain, s *DmStatement) int {
  147. return filterChain.DmStatementNumInput(s)
  148. }
  149. func (sf *statFilter) DmStatementExec(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmResult, error) {
  150. stmtExecBefore(s)
  151. dr, err := filterChain.DmStatementExec(s, args)
  152. if err != nil {
  153. statementExecuteErrorAfter(s, args, err)
  154. return nil, err
  155. }
  156. stmtExecAfter(s, args, int(dr.affectedRows))
  157. return dr, nil
  158. }
  159. func (sf *statFilter) DmStatementExecContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmResult, error) {
  160. stmtExecBefore(s)
  161. dr, err := filterChain.DmStatementExecContext(s, ctx, args)
  162. if err != nil {
  163. statementExecuteErrorAfter(s, args, err)
  164. return nil, err
  165. }
  166. stmtExecAfter(s, args, int(dr.affectedRows))
  167. return dr, nil
  168. }
  169. func (sf *statFilter) DmStatementQuery(filterChain *filterChain, s *DmStatement, args []driver.Value) (*DmRows, error) {
  170. stmtQueryBefore(s)
  171. dr, err := filterChain.DmStatementQuery(s, args)
  172. if err != nil {
  173. statementExecuteErrorAfter(s, args, err)
  174. return nil, err
  175. }
  176. stmtQueryAfter(s, args, dr)
  177. return dr, nil
  178. }
  179. func (sf *statFilter) DmStatementQueryContext(filterChain *filterChain, s *DmStatement, ctx context.Context, args []driver.NamedValue) (*DmRows, error) {
  180. stmtQueryBefore(s)
  181. dr, err := filterChain.DmStatementQueryContext(s, ctx, args)
  182. if err != nil {
  183. statementExecuteErrorAfter(s, args, err)
  184. return nil, err
  185. }
  186. stmtQueryAfter(s, args, dr)
  187. return dr, nil
  188. }
  189. func (sf *statFilter) DmStatementCheckNamedValue(filterChain *filterChain, s *DmStatement, nv *driver.NamedValue) error {
  190. return filterChain.DmStatementCheckNamedValue(s, nv)
  191. }
  192. //DmResult
  193. func (sf *statFilter) DmResultLastInsertId(filterChain *filterChain, r *DmResult) (int64, error) {
  194. return filterChain.DmResultLastInsertId(r)
  195. }
  196. func (sf *statFilter) DmResultRowsAffected(filterChain *filterChain, r *DmResult) (int64, error) {
  197. return filterChain.DmResultRowsAffected(r)
  198. }
  199. //DmRows
  200. func (sf *statFilter) DmRowsColumns(filterChain *filterChain, r *DmRows) []string {
  201. return filterChain.DmRowsColumns(r)
  202. }
  203. func (sf *statFilter) DmRowsClose(filterChain *filterChain, r *DmRows) error {
  204. if !r.CurrentRows.closed {
  205. resultSetCloseBefore(r)
  206. }
  207. return filterChain.DmRowsClose(r)
  208. }
  209. func (sf *statFilter) DmRowsNext(filterChain *filterChain, r *DmRows, dest []driver.Value) error {
  210. return filterChain.DmRowsNext(r, dest)
  211. }
  212. func (sf *statFilter) DmRowsHasNextResultSet(filterChain *filterChain, r *DmRows) bool {
  213. return filterChain.DmRowsHasNextResultSet(r)
  214. }
  215. func (sf *statFilter) DmRowsNextResultSet(filterChain *filterChain, r *DmRows) error {
  216. return filterChain.DmRowsNextResultSet(r)
  217. }
  218. func (sf *statFilter) DmRowsColumnTypeScanType(filterChain *filterChain, r *DmRows, index int) reflect.Type {
  219. return filterChain.DmRowsColumnTypeScanType(r, index)
  220. }
  221. func (sf *statFilter) DmRowsColumnTypeDatabaseTypeName(filterChain *filterChain, r *DmRows, index int) string {
  222. return filterChain.DmRowsColumnTypeDatabaseTypeName(r, index)
  223. }
  224. func (sf *statFilter) DmRowsColumnTypeLength(filterChain *filterChain, r *DmRows, index int) (length int64, ok bool) {
  225. return filterChain.DmRowsColumnTypeLength(r, index)
  226. }
  227. func (sf *statFilter) DmRowsColumnTypeNullable(filterChain *filterChain, r *DmRows, index int) (nullable, ok bool) {
  228. return filterChain.DmRowsColumnTypeNullable(r, index)
  229. }
  230. func (sf *statFilter) DmRowsColumnTypePrecisionScale(filterChain *filterChain, r *DmRows, index int) (precision, scale int64, ok bool) {
  231. return filterChain.DmRowsColumnTypePrecisionScale(r, index)
  232. }
  233. func getActiveStmtCount(conn *DmConnection) int {
  234. if conn.stmtMap == nil {
  235. return 0
  236. } else {
  237. return len(conn.stmtMap)
  238. }
  239. }
  240. func statementCreateAfter(conn *DmConnection, stmt *DmStatement) {
  241. stmt.statInfo.setConstructNano()
  242. conn.statInfo.getConnStat().incrementStmt()
  243. }
  244. func connExecBefore(conn *DmConnection, sql string) {
  245. conn.statInfo.setLastExecuteSql(sql)
  246. conn.statInfo.setFirstResultSet(false)
  247. conn.statInfo.setLastExecuteType(ExecuteUpdate)
  248. internalBeforeConnExecute(conn, sql)
  249. }
  250. func connExecAfter(conn *DmConnection, sql string, args interface{}, updateCount int) {
  251. internalAfterConnExecute(conn, args, updateCount)
  252. }
  253. func connQueryBefore(conn *DmConnection, sql string) {
  254. conn.statInfo.setLastExecuteSql(sql)
  255. conn.statInfo.setFirstResultSet(true)
  256. conn.statInfo.setLastExecuteType(ExecuteQuery)
  257. internalBeforeConnExecute(conn, sql)
  258. }
  259. func connQueryAfter(conn *DmConnection, sql string, args interface{}, resultSet *DmRows) {
  260. if resultSet != nil {
  261. connResultSetCreateAfter(resultSet, conn)
  262. }
  263. internalAfterConnExecute(conn, args, 0)
  264. }
  265. func stmtExecBefore(stmt *DmStatement) {
  266. stmt.statInfo.setLastExecuteSql(stmt.nativeSql)
  267. stmt.statInfo.setFirstResultSet(false)
  268. stmt.statInfo.setLastExecuteType(ExecuteUpdate)
  269. internalBeforeStatementExecute(stmt, stmt.nativeSql)
  270. }
  271. func stmtExecAfter(stmt *DmStatement, args interface{}, updateCount int) {
  272. internalAfterStatementExecute(stmt, args, updateCount)
  273. }
  274. func stmtQueryBefore(stmt *DmStatement) {
  275. stmt.statInfo.setLastExecuteSql(stmt.nativeSql)
  276. stmt.statInfo.setFirstResultSet(true)
  277. stmt.statInfo.setLastExecuteType(ExecuteQuery)
  278. internalBeforeStatementExecute(stmt, stmt.nativeSql)
  279. }
  280. func stmtQueryAfter(stmt *DmStatement, args interface{}, resultSet *DmRows) {
  281. if resultSet != nil {
  282. stmtResultSetCreateAfter(resultSet, stmt)
  283. }
  284. internalAfterStatementExecute(stmt, args, 0)
  285. }
  286. func internalBeforeConnExecute(conn *DmConnection, sql string) {
  287. connStat := conn.statInfo.getConnStat()
  288. connStat.incrementExecuteCount()
  289. conn.statInfo.beforeExecute()
  290. sqlStat := conn.statInfo.getSqlStat()
  291. if sqlStat == nil || sqlStat.Removed == 1 || !(sqlStat.Sql == sql) {
  292. sqlStat = connStat.createSqlStat(sql)
  293. conn.statInfo.setSqlStat(sqlStat)
  294. }
  295. inTransaction := false
  296. inTransaction = !conn.autoCommit
  297. if sqlStat != nil {
  298. sqlStat.ExecuteLastStartTime = time.Now().UnixNano()
  299. sqlStat.incrementRunningCount()
  300. if inTransaction {
  301. sqlStat.incrementInTransactionCount()
  302. }
  303. }
  304. }
  305. func internalAfterConnExecute(conn *DmConnection, args interface{}, updateCount int) {
  306. nowNano := time.Now().UnixNano()
  307. nanos := nowNano - conn.statInfo.getLastExecuteStartNano()
  308. conn.statInfo.afterExecute(nanos)
  309. sqlStat := conn.statInfo.getSqlStat()
  310. if sqlStat != nil {
  311. sqlStat.incrementExecuteSuccessCount()
  312. sqlStat.decrementRunningCount()
  313. parameters := buildSlowParameters(args)
  314. sqlStat.addExecuteTimeAndResultHoldTimeHistogramRecord(conn.statInfo.getLastExecuteType(), conn.statInfo.isFirstResultSet(),
  315. nanos, parameters)
  316. if !conn.statInfo.isFirstResultSet() &&
  317. conn.statInfo.getLastExecuteType() == ExecuteUpdate {
  318. if updateCount < 0 {
  319. updateCount = 0
  320. }
  321. sqlStat.addUpdateCount(int64(updateCount))
  322. }
  323. }
  324. }
  325. func internalBeforeStatementExecute(stmt *DmStatement, sql string) {
  326. connStat := stmt.dmConn.statInfo.getConnStat()
  327. connStat.incrementExecuteCount()
  328. stmt.statInfo.beforeExecute()
  329. sqlStat := stmt.statInfo.getSqlStat()
  330. if sqlStat == nil || sqlStat.Removed == 1 || !(sqlStat.Sql == sql) {
  331. sqlStat = connStat.createSqlStat(sql)
  332. stmt.statInfo.setSqlStat(sqlStat)
  333. }
  334. inTransaction := false
  335. inTransaction = !stmt.dmConn.autoCommit
  336. if sqlStat != nil {
  337. sqlStat.ExecuteLastStartTime = time.Now().UnixNano()
  338. sqlStat.incrementRunningCount()
  339. if inTransaction {
  340. sqlStat.incrementInTransactionCount()
  341. }
  342. }
  343. }
  344. func internalAfterStatementExecute(stmt *DmStatement, args interface{}, updateCount int) {
  345. nowNano := time.Now().UnixNano()
  346. nanos := nowNano - stmt.statInfo.getLastExecuteStartNano()
  347. stmt.statInfo.afterExecute(nanos)
  348. sqlStat := stmt.statInfo.getSqlStat()
  349. if sqlStat != nil {
  350. sqlStat.incrementExecuteSuccessCount()
  351. sqlStat.decrementRunningCount()
  352. parameters := ""
  353. if stmt.paramCount > 0 {
  354. parameters = buildStmtSlowParameters(stmt, args)
  355. }
  356. sqlStat.addExecuteTimeAndResultHoldTimeHistogramRecord(stmt.statInfo.getLastExecuteType(), stmt.statInfo.isFirstResultSet(),
  357. nanos, parameters)
  358. if (!stmt.statInfo.isFirstResultSet()) &&
  359. stmt.statInfo.getLastExecuteType() == ExecuteUpdate {
  360. updateCount := stmt.execInfo.updateCount
  361. if updateCount < 0 {
  362. updateCount = 0
  363. }
  364. sqlStat.addUpdateCount(updateCount)
  365. }
  366. }
  367. }
  368. func buildSlowParameters(args interface{}) string {
  369. switch v := args.(type) {
  370. case []driver.Value:
  371. sb := bytes.NewBufferString("")
  372. for i := 0; i < len(v); i++ {
  373. if i != 0 {
  374. sb.WriteString(",")
  375. } else {
  376. sb.WriteString("[")
  377. }
  378. sb.WriteString(fmt.Sprint(v[i]))
  379. }
  380. if len(v) > 0 {
  381. sb.WriteString("]")
  382. }
  383. return sb.String()
  384. case []driver.NamedValue:
  385. sb := bytes.NewBufferString("")
  386. for i := 0; i < len(v); i++ {
  387. if i != 0 {
  388. sb.WriteString(",")
  389. } else {
  390. sb.WriteString("[")
  391. }
  392. sb.WriteString(fmt.Sprint(v[i]))
  393. }
  394. if len(v) > 0 {
  395. sb.WriteString("]")
  396. }
  397. return sb.String()
  398. default:
  399. return ""
  400. }
  401. }
  402. func buildStmtSlowParameters(stmt *DmStatement, args interface{}) string {
  403. switch v := args.(type) {
  404. case []driver.Value:
  405. sb := bytes.NewBufferString("")
  406. for i := 0; i < int(stmt.paramCount); i++ {
  407. if i != 0 {
  408. sb.WriteString(",")
  409. } else {
  410. sb.WriteString("[")
  411. }
  412. sb.WriteString(fmt.Sprint(v[i]))
  413. }
  414. if len(v) > 0 {
  415. sb.WriteString("]")
  416. }
  417. return sb.String()
  418. case []driver.NamedValue:
  419. sb := bytes.NewBufferString("")
  420. for i := 0; i < int(stmt.paramCount); i++ {
  421. if i != 0 {
  422. sb.WriteString(",")
  423. } else {
  424. sb.WriteString("[")
  425. }
  426. sb.WriteString(fmt.Sprint(v[i]))
  427. }
  428. if len(v) > 0 {
  429. sb.WriteString("]")
  430. }
  431. return sb.String()
  432. default:
  433. return ""
  434. }
  435. }
  436. func connExecuteErrorAfter(conn *DmConnection, args interface{}, err error) {
  437. nanos := time.Now().UnixNano() - conn.statInfo.getLastExecuteStartNano()
  438. conn.statInfo.getConnStat().incrementErrorCount()
  439. conn.statInfo.afterExecute(nanos)
  440. // SQL
  441. sqlStat := conn.statInfo.getSqlStat()
  442. if sqlStat != nil {
  443. sqlStat.decrementRunningCount()
  444. sqlStat.error(err)
  445. parameters := buildSlowParameters(args)
  446. sqlStat.addExecuteTimeAndResultHoldTimeHistogramRecord(conn.statInfo.getLastExecuteType(), conn.statInfo.isFirstResultSet(),
  447. nanos, parameters)
  448. }
  449. }
  450. func statementExecuteErrorAfter(stmt *DmStatement, args interface{}, err error) {
  451. nanos := time.Now().UnixNano() - stmt.statInfo.getLastExecuteStartNano()
  452. stmt.dmConn.statInfo.getConnStat().incrementErrorCount()
  453. stmt.statInfo.afterExecute(nanos)
  454. // SQL
  455. sqlStat := stmt.statInfo.getSqlStat()
  456. if sqlStat != nil {
  457. sqlStat.decrementRunningCount()
  458. sqlStat.error(err)
  459. parameters := ""
  460. if stmt.paramCount > 0 {
  461. parameters = buildStmtSlowParameters(stmt, args)
  462. }
  463. sqlStat.addExecuteTimeAndResultHoldTimeHistogramRecord(stmt.statInfo.getLastExecuteType(), stmt.statInfo.isFirstResultSet(),
  464. nanos, parameters)
  465. }
  466. }
  467. func statementCloseBefore(stmt *DmStatement) {
  468. stmt.dmConn.statInfo.getConnStat().decrementStmt()
  469. }
  470. func connResultSetCreateAfter(dmdbResultSet *DmRows, conn *DmConnection) {
  471. dmdbResultSet.statInfo.setSql(conn.statInfo.getLastExecuteSql())
  472. dmdbResultSet.statInfo.setSqlStat(conn.statInfo.getSqlStat())
  473. dmdbResultSet.statInfo.setConstructNano()
  474. }
  475. func stmtResultSetCreateAfter(dmdbResultSet *DmRows, stmt *DmStatement) {
  476. dmdbResultSet.statInfo.setSql(stmt.statInfo.getLastExecuteSql())
  477. dmdbResultSet.statInfo.setSqlStat(stmt.statInfo.getSqlStat())
  478. dmdbResultSet.statInfo.setConstructNano()
  479. }
  480. func resultSetCloseBefore(resultSet *DmRows) {
  481. nanos := time.Now().UnixNano() - resultSet.statInfo.getConstructNano()
  482. fetchRowCount := getFetchedRows(resultSet)
  483. sqlStat := resultSet.statInfo.getSqlStat()
  484. if sqlStat != nil && resultSet.statInfo.getCloseCount() == 0 {
  485. sqlStat.addFetchRowCount(fetchRowCount)
  486. stmtExecuteNano := resultSet.statInfo.getLastExecuteTimeNano()
  487. sqlStat.addResultSetHoldTimeNano2(stmtExecuteNano, nanos)
  488. if resultSet.statInfo.getReadStringLength() > 0 {
  489. sqlStat.addStringReadLength(resultSet.statInfo.getReadStringLength())
  490. }
  491. if resultSet.statInfo.getReadBytesLength() > 0 {
  492. sqlStat.addReadBytesLength(resultSet.statInfo.getReadBytesLength())
  493. }
  494. if resultSet.statInfo.getOpenInputStreamCount() > 0 {
  495. sqlStat.addInputStreamOpenCount(int64(resultSet.statInfo.getOpenInputStreamCount()))
  496. }
  497. if resultSet.statInfo.getOpenReaderCount() > 0 {
  498. sqlStat.addReaderOpenCount(int64(resultSet.statInfo.getOpenReaderCount()))
  499. }
  500. }
  501. resultSet.statInfo.incrementCloseCount()
  502. }
  503. func getFetchedRows(rs *DmRows) int64 {
  504. if rs.CurrentRows.currentPos >= rs.CurrentRows.totalRowCount {
  505. return rs.CurrentRows.totalRowCount
  506. } else {
  507. return rs.CurrentRows.currentPos + 1
  508. }
  509. }