m.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. /*
  2. * Copyright (c) 2000-2018, 达梦数据库有限公司.
  3. * All rights reserved.
  4. */
  5. package dm
  6. import (
  7. "bytes"
  8. "context"
  9. "database/sql"
  10. "database/sql/driver"
  11. "fmt"
  12. "sync/atomic"
  13. "gitee.com/chunanyong/dm/parser"
  14. "golang.org/x/text/encoding"
  15. )
  16. type DmConnection struct {
  17. filterable
  18. dmConnector *DmConnector
  19. Access *dm_build_697
  20. stmtMap map[int32]*DmStatement
  21. stmtPool []stmtPoolInfo
  22. lastExecInfo *execRetInfo
  23. lexer *parser.Lexer
  24. encode encoding.Encoding
  25. encodeBuffer *bytes.Buffer
  26. transformReaderDst []byte
  27. transformReaderSrc []byte
  28. serverEncoding string
  29. GlobalServerSeries int
  30. ServerVersion string
  31. Malini2 bool
  32. Execute2 bool
  33. LobEmptyCompOrcl bool
  34. IsoLevel int32
  35. ReadOnly bool
  36. NewLobFlag bool
  37. sslEncrypt int
  38. MaxRowSize int32
  39. DDLAutoCommit bool
  40. BackslashEscape bool
  41. SvrStat int32
  42. SvrMode int32
  43. ConstParaOpt bool
  44. DbTimezone int16
  45. LifeTimeRemainder int16
  46. InstanceName string
  47. Schema string
  48. LastLoginIP string
  49. LastLoginTime string
  50. FailedAttempts int32
  51. LoginWarningID int32
  52. GraceTimeRemainder int32
  53. Guid string
  54. DbName string
  55. StandbyHost string
  56. StandbyPort int32
  57. StandbyCount int32
  58. SessionID int64
  59. OracleDateLanguage byte
  60. FormatDate string
  61. FormatTimestamp string
  62. FormatTimestampTZ string
  63. FormatTime string
  64. FormatTimeTZ string
  65. Local bool
  66. MsgVersion int32
  67. TrxStatus int32
  68. dscControl bool
  69. trxFinish bool
  70. sessionID int64
  71. autoCommit bool
  72. isBatch bool
  73. watching bool
  74. watcher chan<- context.Context
  75. closech chan struct{}
  76. finished chan<- struct{}
  77. canceled atomicError
  78. closed atomicBool
  79. }
  80. func (conn *DmConnection) setTrxFinish(status int32) {
  81. switch status & Dm_build_1094 {
  82. case Dm_build_1091, Dm_build_1092, Dm_build_1093:
  83. conn.trxFinish = true
  84. default:
  85. conn.trxFinish = false
  86. }
  87. }
  88. func (dmConn *DmConnection) init() {
  89. if dmConn.dmConnector.stmtPoolMaxSize > 0 {
  90. dmConn.stmtPool = make([]stmtPoolInfo, 0, dmConn.dmConnector.stmtPoolMaxSize)
  91. }
  92. dmConn.stmtMap = make(map[int32]*DmStatement)
  93. dmConn.DbTimezone = 0
  94. dmConn.GlobalServerSeries = 0
  95. dmConn.MaxRowSize = 0
  96. dmConn.LobEmptyCompOrcl = false
  97. dmConn.ReadOnly = false
  98. dmConn.DDLAutoCommit = false
  99. dmConn.ConstParaOpt = false
  100. dmConn.IsoLevel = -1
  101. dmConn.sessionID = -1
  102. dmConn.Malini2 = true
  103. dmConn.NewLobFlag = true
  104. dmConn.Execute2 = true
  105. dmConn.serverEncoding = ENCODING_GB18030
  106. dmConn.TrxStatus = Dm_build_1042
  107. dmConn.OracleDateLanguage = byte(Locale)
  108. dmConn.lastExecInfo = NewExceInfo()
  109. dmConn.MsgVersion = Dm_build_975
  110. dmConn.idGenerator = dmConnIDGenerator
  111. }
  112. func (dmConn *DmConnection) reset() {
  113. dmConn.DbTimezone = 0
  114. dmConn.GlobalServerSeries = 0
  115. dmConn.MaxRowSize = 0
  116. dmConn.LobEmptyCompOrcl = false
  117. dmConn.ReadOnly = false
  118. dmConn.DDLAutoCommit = false
  119. dmConn.ConstParaOpt = false
  120. dmConn.IsoLevel = -1
  121. dmConn.sessionID = -1
  122. dmConn.Malini2 = true
  123. dmConn.NewLobFlag = true
  124. dmConn.Execute2 = true
  125. dmConn.serverEncoding = ENCODING_GB18030
  126. dmConn.TrxStatus = Dm_build_1042
  127. }
  128. func (dc *DmConnection) checkClosed() error {
  129. if dc.closed.IsSet() {
  130. return driver.ErrBadConn
  131. }
  132. return nil
  133. }
  134. func (dc *DmConnection) executeInner(query string, execType int16) (interface{}, error) {
  135. stmt, err := NewDmStmt(dc, query)
  136. if err != nil {
  137. return nil, err
  138. }
  139. if execType == Dm_build_1059 {
  140. defer stmt.close()
  141. }
  142. stmt.innerUsed = true
  143. if stmt.dmConn.dmConnector.escapeProcess {
  144. stmt.nativeSql, err = stmt.dmConn.escape(stmt.nativeSql, stmt.dmConn.dmConnector.keyWords)
  145. if err != nil {
  146. stmt.close()
  147. return nil, err
  148. }
  149. }
  150. var optParamList []OptParameter
  151. if stmt.dmConn.ConstParaOpt {
  152. optParamList = make([]OptParameter, 0)
  153. stmt.nativeSql, optParamList, err = stmt.dmConn.execOpt(stmt.nativeSql, optParamList, stmt.dmConn.getServerEncoding())
  154. if err != nil {
  155. stmt.close()
  156. optParamList = nil
  157. }
  158. }
  159. if execType == Dm_build_1058 && dc.dmConnector.enRsCache {
  160. rpv, err := rp.get(stmt, query)
  161. if err != nil {
  162. return nil, err
  163. }
  164. if rpv != nil {
  165. stmt.execInfo = rpv.execInfo
  166. dc.lastExecInfo = rpv.execInfo
  167. return newDmRows(rpv.getResultSet(stmt)), nil
  168. }
  169. }
  170. var info *execRetInfo
  171. if optParamList != nil && len(optParamList) > 0 {
  172. info, err = dc.Access.Dm_build_776(stmt, optParamList)
  173. if err != nil {
  174. stmt.nativeSql = query
  175. info, err = dc.Access.Dm_build_782(stmt, execType)
  176. }
  177. } else {
  178. info, err = dc.Access.Dm_build_782(stmt, execType)
  179. }
  180. if err != nil {
  181. stmt.close()
  182. return nil, err
  183. }
  184. dc.lastExecInfo = info
  185. if info.hasResultSet {
  186. return newDmRows(newInnerRows(0, stmt, info)), nil
  187. } else {
  188. return newDmResult(stmt, info), nil
  189. }
  190. }
  191. func g2dbIsoLevel(isoLevel int32) int32 {
  192. switch isoLevel {
  193. case 1:
  194. return Dm_build_1046
  195. case 2:
  196. return Dm_build_1047
  197. case 4:
  198. return Dm_build_1048
  199. case 6:
  200. return Dm_build_1049
  201. default:
  202. return -1
  203. }
  204. }
  205. func (dc *DmConnection) Begin() (driver.Tx, error) {
  206. if len(dc.filterChain.filters) == 0 {
  207. return dc.begin()
  208. } else {
  209. return dc.filterChain.reset().DmConnectionBegin(dc)
  210. }
  211. }
  212. func (dc *DmConnection) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
  213. if len(dc.filterChain.filters) == 0 {
  214. return dc.beginTx(ctx, opts)
  215. }
  216. return dc.filterChain.reset().DmConnectionBeginTx(dc, ctx, opts)
  217. }
  218. func (dc *DmConnection) Commit() error {
  219. if len(dc.filterChain.filters) == 0 {
  220. return dc.commit()
  221. } else {
  222. return dc.filterChain.reset().DmConnectionCommit(dc)
  223. }
  224. }
  225. func (dc *DmConnection) Rollback() error {
  226. if len(dc.filterChain.filters) == 0 {
  227. return dc.rollback()
  228. } else {
  229. return dc.filterChain.reset().DmConnectionRollback(dc)
  230. }
  231. }
  232. func (dc *DmConnection) Close() error {
  233. if len(dc.filterChain.filters) == 0 {
  234. return dc.close()
  235. } else {
  236. return dc.filterChain.reset().DmConnectionClose(dc)
  237. }
  238. }
  239. func (dc *DmConnection) Ping(ctx context.Context) error {
  240. if len(dc.filterChain.filters) == 0 {
  241. return dc.ping(ctx)
  242. } else {
  243. return dc.filterChain.reset().DmConnectionPing(dc, ctx)
  244. }
  245. }
  246. func (dc *DmConnection) Exec(query string, args []driver.Value) (driver.Result, error) {
  247. if len(dc.filterChain.filters) == 0 {
  248. return dc.exec(query, args)
  249. }
  250. return dc.filterChain.reset().DmConnectionExec(dc, query, args)
  251. }
  252. func (dc *DmConnection) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
  253. if len(dc.filterChain.filters) == 0 {
  254. return dc.execContext(ctx, query, args)
  255. }
  256. return dc.filterChain.reset().DmConnectionExecContext(dc, ctx, query, args)
  257. }
  258. func (dc *DmConnection) Query(query string, args []driver.Value) (driver.Rows, error) {
  259. if len(dc.filterChain.filters) == 0 {
  260. return dc.query(query, args)
  261. }
  262. return dc.filterChain.reset().DmConnectionQuery(dc, query, args)
  263. }
  264. func (dc *DmConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
  265. if len(dc.filterChain.filters) == 0 {
  266. return dc.queryContext(ctx, query, args)
  267. }
  268. return dc.filterChain.reset().DmConnectionQueryContext(dc, ctx, query, args)
  269. }
  270. func (dc *DmConnection) Prepare(query string) (driver.Stmt, error) {
  271. if len(dc.filterChain.filters) == 0 {
  272. return dc.prepare(query)
  273. }
  274. return dc.filterChain.reset().DmConnectionPrepare(dc, query)
  275. }
  276. func (dc *DmConnection) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
  277. if len(dc.filterChain.filters) == 0 {
  278. return dc.prepareContext(ctx, query)
  279. }
  280. return dc.filterChain.reset().DmConnectionPrepareContext(dc, ctx, query)
  281. }
  282. func (dc *DmConnection) ResetSession(ctx context.Context) error {
  283. if len(dc.filterChain.filters) == 0 {
  284. return dc.resetSession(ctx)
  285. }
  286. return dc.filterChain.reset().DmConnectionResetSession(dc, ctx)
  287. }
  288. func (dc *DmConnection) CheckNamedValue(nv *driver.NamedValue) error {
  289. if len(dc.filterChain.filters) == 0 {
  290. return dc.checkNamedValue(nv)
  291. }
  292. return dc.filterChain.reset().DmConnectionCheckNamedValue(dc, nv)
  293. }
  294. func (dc *DmConnection) begin() (*DmConnection, error) {
  295. return dc.beginTx(context.Background(), driver.TxOptions{driver.IsolationLevel(sql.LevelDefault), false})
  296. }
  297. func (dc *DmConnection) beginTx(ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
  298. if err := dc.watchCancel(ctx); err != nil {
  299. return nil, err
  300. }
  301. defer dc.finish()
  302. err := dc.checkClosed()
  303. if err != nil {
  304. return nil, err
  305. }
  306. dc.autoCommit = false
  307. if sql.IsolationLevel(opts.Isolation) == sql.LevelDefault {
  308. opts.Isolation = driver.IsolationLevel(sql.LevelReadCommitted)
  309. }
  310. if dc.ReadOnly != opts.ReadOnly {
  311. dc.ReadOnly = opts.ReadOnly
  312. var readonly = 0
  313. if opts.ReadOnly {
  314. readonly = 1
  315. }
  316. dc.exec(fmt.Sprintf("SP_SET_SESSION_READONLY(%d)", readonly), nil)
  317. }
  318. if dc.IsoLevel != int32(opts.Isolation) {
  319. switch sql.IsolationLevel(opts.Isolation) {
  320. case sql.LevelDefault, sql.LevelReadUncommitted:
  321. return dc, nil
  322. case sql.LevelReadCommitted, sql.LevelSerializable:
  323. dc.IsoLevel = int32(opts.Isolation)
  324. case sql.LevelRepeatableRead:
  325. if dc.CompatibleMysql() {
  326. dc.IsoLevel = int32(sql.LevelReadCommitted)
  327. } else {
  328. return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
  329. }
  330. default:
  331. return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
  332. }
  333. err = dc.Access.Dm_build_836(dc)
  334. if err != nil {
  335. return nil, err
  336. }
  337. }
  338. return dc, nil
  339. }
  340. func (dc *DmConnection) commit() error {
  341. err := dc.checkClosed()
  342. if err != nil {
  343. return err
  344. }
  345. defer func() {
  346. dc.autoCommit = dc.dmConnector.autoCommit
  347. if dc.ReadOnly {
  348. dc.exec("SP_SET_SESSION_READONLY(0)", nil)
  349. }
  350. }()
  351. if !dc.autoCommit {
  352. err = dc.Access.Commit()
  353. if err != nil {
  354. return err
  355. }
  356. dc.trxFinish = true
  357. return nil
  358. } else if !dc.dmConnector.alwayseAllowCommit {
  359. return ECGO_COMMIT_IN_AUTOCOMMIT_MODE.throw()
  360. }
  361. return nil
  362. }
  363. func (dc *DmConnection) rollback() error {
  364. err := dc.checkClosed()
  365. if err != nil {
  366. return err
  367. }
  368. defer func() {
  369. dc.autoCommit = dc.dmConnector.autoCommit
  370. if dc.ReadOnly {
  371. dc.exec("SP_SET_SESSION_READONLY(0)", nil)
  372. }
  373. }()
  374. if !dc.autoCommit {
  375. err = dc.Access.Rollback()
  376. if err != nil {
  377. return err
  378. }
  379. dc.trxFinish = true
  380. return nil
  381. } else if !dc.dmConnector.alwayseAllowCommit {
  382. return ECGO_ROLLBACK_IN_AUTOCOMMIT_MODE.throw()
  383. }
  384. return nil
  385. }
  386. func (dc *DmConnection) reconnect() error {
  387. err := dc.Access.Close()
  388. if err != nil {
  389. return err
  390. }
  391. for _, stmt := range dc.stmtMap {
  392. stmt.closed = true
  393. for id, _ := range stmt.rsMap {
  394. delete(stmt.rsMap, id)
  395. }
  396. }
  397. if dc.stmtPool != nil {
  398. dc.stmtPool = dc.stmtPool[:0]
  399. }
  400. dc.dmConnector.reConnection = dc
  401. if dc.dmConnector.group != nil {
  402. _, err = dc.dmConnector.group.connect(dc.dmConnector)
  403. if err != nil {
  404. return err
  405. }
  406. } else {
  407. _, err = dc.dmConnector.connect(context.Background())
  408. }
  409. for _, stmt := range dc.stmtMap {
  410. err = dc.Access.Dm_build_754(stmt)
  411. if err != nil {
  412. return err
  413. }
  414. if stmt.paramCount > 0 {
  415. err = stmt.prepare()
  416. if err != nil {
  417. return err
  418. }
  419. }
  420. }
  421. return nil
  422. }
  423. func (dc *DmConnection) cleanup() {
  424. dc.close()
  425. }
  426. func (dc *DmConnection) close() error {
  427. if !dc.closed.TrySet(true) {
  428. return nil
  429. }
  430. close(dc.closech)
  431. if dc.Access == nil {
  432. return nil
  433. }
  434. dc.rollback()
  435. for _, stmt := range dc.stmtMap {
  436. stmt.free()
  437. }
  438. if dc.stmtPool != nil {
  439. for _, spi := range dc.stmtPool {
  440. dc.Access.Dm_build_759(spi.id)
  441. }
  442. dc.stmtPool = nil
  443. }
  444. dc.Access.Close()
  445. return nil
  446. }
  447. func (dc *DmConnection) ping(ctx context.Context) error {
  448. if err := dc.watchCancel(ctx); err != nil {
  449. return err
  450. }
  451. defer dc.finish()
  452. rows, err := dc.query("select 1", nil)
  453. if err != nil {
  454. return err
  455. }
  456. return rows.close()
  457. }
  458. func (dc *DmConnection) exec(query string, args []driver.Value) (*DmResult, error) {
  459. err := dc.checkClosed()
  460. if err != nil {
  461. return nil, err
  462. }
  463. if args != nil && len(args) > 0 {
  464. stmt, err := dc.prepare(query)
  465. defer stmt.close()
  466. if err != nil {
  467. return nil, err
  468. }
  469. dc.lastExecInfo = stmt.execInfo
  470. return stmt.exec(args)
  471. } else {
  472. r1, err := dc.executeInner(query, Dm_build_1059)
  473. if err != nil {
  474. return nil, err
  475. }
  476. if r2, ok := r1.(*DmResult); ok {
  477. return r2, nil
  478. } else {
  479. return nil, ECGO_NOT_EXEC_SQL.throw()
  480. }
  481. }
  482. }
  483. func (dc *DmConnection) execContext(ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
  484. if err := dc.watchCancel(ctx); err != nil {
  485. return nil, err
  486. }
  487. defer dc.finish()
  488. err := dc.checkClosed()
  489. if err != nil {
  490. return nil, err
  491. }
  492. if args != nil && len(args) > 0 {
  493. stmt, err := dc.prepare(query)
  494. defer stmt.close()
  495. if err != nil {
  496. return nil, err
  497. }
  498. dc.lastExecInfo = stmt.execInfo
  499. dargs, err := namedValueToValue(stmt, args)
  500. if err != nil {
  501. return nil, err
  502. }
  503. return stmt.exec(dargs)
  504. } else {
  505. r1, err := dc.executeInner(query, Dm_build_1059)
  506. if err != nil {
  507. return nil, err
  508. }
  509. if r2, ok := r1.(*DmResult); ok {
  510. return r2, nil
  511. } else {
  512. return nil, ECGO_NOT_EXEC_SQL.throw()
  513. }
  514. }
  515. }
  516. func (dc *DmConnection) query(query string, args []driver.Value) (*DmRows, error) {
  517. err := dc.checkClosed()
  518. if err != nil {
  519. return nil, err
  520. }
  521. if args != nil && len(args) > 0 {
  522. stmt, err := dc.prepare(query)
  523. if err != nil {
  524. stmt.close()
  525. return nil, err
  526. }
  527. dc.lastExecInfo = stmt.execInfo
  528. stmt.innerUsed = true
  529. return stmt.query(args)
  530. } else {
  531. r1, err := dc.executeInner(query, Dm_build_1058)
  532. if err != nil {
  533. return nil, err
  534. }
  535. if r2, ok := r1.(*DmRows); ok {
  536. return r2, nil
  537. } else {
  538. return nil, ECGO_NOT_QUERY_SQL.throw()
  539. }
  540. }
  541. }
  542. func (dc *DmConnection) queryContext(ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
  543. if err := dc.watchCancel(ctx); err != nil {
  544. return nil, err
  545. }
  546. defer dc.finish()
  547. err := dc.checkClosed()
  548. if err != nil {
  549. return nil, err
  550. }
  551. if args != nil && len(args) > 0 {
  552. stmt, err := dc.prepare(query)
  553. if err != nil {
  554. stmt.close()
  555. return nil, err
  556. }
  557. dc.lastExecInfo = stmt.execInfo
  558. stmt.innerUsed = true
  559. dargs, err := namedValueToValue(stmt, args)
  560. if err != nil {
  561. return nil, err
  562. }
  563. return stmt.query(dargs)
  564. } else {
  565. r1, err := dc.executeInner(query, Dm_build_1058)
  566. if err != nil {
  567. return nil, err
  568. }
  569. if r2, ok := r1.(*DmRows); ok {
  570. return r2, nil
  571. } else {
  572. return nil, ECGO_NOT_QUERY_SQL.throw()
  573. }
  574. }
  575. }
  576. func (dc *DmConnection) prepare(query string) (*DmStatement, error) {
  577. err := dc.checkClosed()
  578. if err != nil {
  579. return nil, err
  580. }
  581. stmt, err := NewDmStmt(dc, query)
  582. if err != nil {
  583. return nil, err
  584. }
  585. err = stmt.prepare()
  586. return stmt, err
  587. }
  588. func (dc *DmConnection) prepareContext(ctx context.Context, query string) (*DmStatement, error) {
  589. if err := dc.watchCancel(ctx); err != nil {
  590. return nil, err
  591. }
  592. defer dc.finish()
  593. err := dc.checkClosed()
  594. if err != nil {
  595. return nil, err
  596. }
  597. stmt, err := dc.prepare(query)
  598. if err != nil {
  599. return nil, err
  600. }
  601. return stmt, nil
  602. }
  603. func (dc *DmConnection) resetSession(ctx context.Context) error {
  604. if err := dc.watchCancel(ctx); err != nil {
  605. return err
  606. }
  607. defer dc.finish()
  608. err := dc.checkClosed()
  609. if err != nil {
  610. return err
  611. }
  612. for _, stmt := range dc.stmtMap {
  613. stmt.inUse = false
  614. }
  615. return nil
  616. }
  617. func (dc *DmConnection) checkNamedValue(nv *driver.NamedValue) error {
  618. var err error
  619. var cvt = converter{dc, false}
  620. nv.Value, err = cvt.ConvertValue(nv.Value)
  621. dc.isBatch = cvt.isBatch
  622. return err
  623. }
  624. func (dc *DmConnection) driverQuery(query string) (*DmStatement, *DmRows, error) {
  625. stmt, err := NewDmStmt(dc, query)
  626. if err != nil {
  627. return nil, nil, err
  628. }
  629. stmt.innerUsed = true
  630. stmt.innerExec = true
  631. info, err := dc.Access.Dm_build_782(stmt, Dm_build_1058)
  632. if err != nil {
  633. return nil, nil, err
  634. }
  635. dc.lastExecInfo = info
  636. stmt.innerExec = false
  637. return stmt, newDmRows(newInnerRows(0, stmt, info)), nil
  638. }
  639. func (dc *DmConnection) getIndexOnEPGroup() int32 {
  640. if dc.dmConnector.group == nil || dc.dmConnector.group.epList == nil {
  641. return -1
  642. }
  643. for i := 0; i < len(dc.dmConnector.group.epList); i++ {
  644. ep := dc.dmConnector.group.epList[i]
  645. if dc.dmConnector.host == ep.host && dc.dmConnector.port == ep.port {
  646. return int32(i)
  647. }
  648. }
  649. return -1
  650. }
  651. func (dc *DmConnection) getServerEncoding() string {
  652. if dc.dmConnector.charCode != "" {
  653. return dc.dmConnector.charCode
  654. }
  655. return dc.serverEncoding
  656. }
  657. func (dc *DmConnection) lobFetchAll() bool {
  658. return dc.dmConnector.lobMode == 2
  659. }
  660. func (conn *DmConnection) CompatibleOracle() bool {
  661. return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_ORACLE
  662. }
  663. func (conn *DmConnection) CompatibleMysql() bool {
  664. return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_MYSQL
  665. }
  666. func (conn *DmConnection) cancel(err error) {
  667. conn.canceled.Set(err)
  668. conn.close()
  669. }
  670. func (conn *DmConnection) finish() {
  671. if !conn.watching || conn.finished == nil {
  672. return
  673. }
  674. select {
  675. case conn.finished <- struct{}{}:
  676. conn.watching = false
  677. case <-conn.closech:
  678. }
  679. }
  680. func (conn *DmConnection) startWatcher() {
  681. watcher := make(chan context.Context, 1)
  682. conn.watcher = watcher
  683. finished := make(chan struct{})
  684. conn.finished = finished
  685. go func() {
  686. for {
  687. var ctx context.Context
  688. select {
  689. case ctx = <-watcher:
  690. case <-conn.closech:
  691. return
  692. }
  693. select {
  694. case <-ctx.Done():
  695. conn.cancel(ctx.Err())
  696. case <-finished:
  697. case <-conn.closech:
  698. return
  699. }
  700. }
  701. }()
  702. }
  703. func (conn *DmConnection) watchCancel(ctx context.Context) error {
  704. if conn.watching {
  705. conn.cleanup()
  706. return nil
  707. }
  708. if err := ctx.Err(); err != nil {
  709. return err
  710. }
  711. if ctx.Done() == nil {
  712. return nil
  713. }
  714. if conn.watcher == nil {
  715. return nil
  716. }
  717. conn.watching = true
  718. conn.watcher <- ctx
  719. return nil
  720. }
  721. type noCopy struct{}
  722. func (*noCopy) Lock() {}
  723. type atomicBool struct {
  724. _noCopy noCopy
  725. value uint32
  726. }
  727. func (ab *atomicBool) IsSet() bool {
  728. return atomic.LoadUint32(&ab.value) > 0
  729. }
  730. func (ab *atomicBool) Set(value bool) {
  731. if value {
  732. atomic.StoreUint32(&ab.value, 1)
  733. } else {
  734. atomic.StoreUint32(&ab.value, 0)
  735. }
  736. }
  737. func (ab *atomicBool) TrySet(value bool) bool {
  738. if value {
  739. return atomic.SwapUint32(&ab.value, 1) == 0
  740. }
  741. return atomic.SwapUint32(&ab.value, 0) > 0
  742. }
  743. type atomicError struct {
  744. _noCopy noCopy
  745. value atomic.Value
  746. }
  747. func (ae *atomicError) Set(value error) {
  748. ae.value.Store(value)
  749. }
  750. func (ae *atomicError) Value() error {
  751. if v := ae.value.Load(); v != nil {
  752. return v.(error)
  753. }
  754. return nil
  755. }