http urls monitor.

database.go 17KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732
  1. package sqladapter
  2. import (
  3. "context"
  4. "database/sql"
  5. "math"
  6. "strconv"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "upper.io/db.v3"
  11. "upper.io/db.v3/internal/cache"
  12. "upper.io/db.v3/internal/sqladapter/compat"
  13. "upper.io/db.v3/internal/sqladapter/exql"
  14. "upper.io/db.v3/lib/sqlbuilder"
  15. )
  16. var (
  17. lastSessID uint64
  18. lastTxID uint64
  19. )
  20. // hasCleanUp is implemented by structs that have a clean up routine that needs
  21. // to be called before Close().
  22. type hasCleanUp interface {
  23. CleanUp() error
  24. }
  25. // hasStatementExec allows the adapter to have its own exec statement.
  26. type hasStatementExec interface {
  27. StatementExec(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
  28. }
  29. type hasConvertValues interface {
  30. ConvertValues(values []interface{}) []interface{}
  31. }
  32. // Database represents a SQL database.
  33. type Database interface {
  34. PartialDatabase
  35. BaseDatabase
  36. }
  37. // PartialDatabase defines methods to be implemented by SQL database adapters.
  38. type PartialDatabase interface {
  39. sqlbuilder.SQLBuilder
  40. // Collections returns a list of non-system tables from the database.
  41. Collections() ([]string, error)
  42. // Open opens a new connection
  43. Open(db.ConnectionURL) error
  44. // TableExists returns an error if the given table does not exist.
  45. TableExists(name string) error
  46. // LookupName returns the name of the database.
  47. LookupName() (string, error)
  48. // PrimaryKeys returns all primary keys on the table.
  49. PrimaryKeys(name string) ([]string, error)
  50. // NewCollection allocates a new collection by name.
  51. NewCollection(name string) db.Collection
  52. // CompileStatement transforms an internal statement into a format
  53. // database/sql can understand.
  54. CompileStatement(stmt *exql.Statement, args []interface{}) (string, []interface{})
  55. // ConnectionURL returns the database's connection URL, if any.
  56. ConnectionURL() db.ConnectionURL
  57. // Err wraps specific database errors (given in string form) and transforms them
  58. // into error values.
  59. Err(in error) (out error)
  60. // NewDatabaseTx begins a transaction block and returns a new
  61. // session backed by it.
  62. NewDatabaseTx(ctx context.Context) (DatabaseTx, error)
  63. }
  64. // BaseDatabase provides logic for methods that can be shared across all SQL
  65. // adapters.
  66. type BaseDatabase interface {
  67. db.Settings
  68. // Name returns the name of the database.
  69. Name() string
  70. // Close closes the database session
  71. Close() error
  72. // Ping checks if the database server is reachable.
  73. Ping() error
  74. // ClearCache clears all caches the session is using
  75. ClearCache()
  76. // Collection returns a new collection.
  77. Collection(string) db.Collection
  78. // Driver returns the underlying driver the session is using
  79. Driver() interface{}
  80. // WaitForConnection attempts to run the given connection function a fixed
  81. // number of times before failing.
  82. WaitForConnection(func() error) error
  83. // BindSession sets the *sql.DB the session will use.
  84. BindSession(*sql.DB) error
  85. // Session returns the *sql.DB the session is using.
  86. Session() *sql.DB
  87. // BindTx binds a transaction to the current session.
  88. BindTx(context.Context, *sql.Tx) error
  89. // Returns the current transaction the session is using.
  90. Transaction() BaseTx
  91. // NewClone clones the database using the given PartialDatabase as base.
  92. NewClone(PartialDatabase, bool) (BaseDatabase, error)
  93. // Context returns the default context the session is using.
  94. Context() context.Context
  95. // SetContext sets a default context for the session.
  96. SetContext(context.Context)
  97. // TxOptions returns the default TxOptions for new transactions in the
  98. // session.
  99. TxOptions() *sql.TxOptions
  100. // SetTxOptions sets default TxOptions for the session.
  101. SetTxOptions(txOptions sql.TxOptions)
  102. }
  103. // NewBaseDatabase provides a BaseDatabase given a PartialDatabase
  104. func NewBaseDatabase(p PartialDatabase) BaseDatabase {
  105. d := &database{
  106. Settings: db.NewSettings(),
  107. PartialDatabase: p,
  108. cachedCollections: cache.NewCache(),
  109. cachedStatements: cache.NewCache(),
  110. }
  111. return d
  112. }
  113. // database is the actual implementation of Database and joins methods from
  114. // BaseDatabase and PartialDatabase
  115. type database struct {
  116. PartialDatabase
  117. db.Settings
  118. lookupNameOnce sync.Once
  119. name string
  120. mu sync.Mutex // guards ctx, txOptions
  121. ctx context.Context
  122. txOptions *sql.TxOptions
  123. sessMu sync.Mutex // guards sess, baseTx
  124. sess *sql.DB
  125. baseTx BaseTx
  126. sessID uint64
  127. txID uint64
  128. cacheMu sync.Mutex // guards cachedStatements and cachedCollections
  129. cachedStatements *cache.Cache
  130. cachedCollections *cache.Cache
  131. template *exql.Template
  132. }
  133. var (
  134. _ = db.Database(&database{})
  135. )
  136. // Session returns the underlying *sql.DB
  137. func (d *database) Session() *sql.DB {
  138. return d.sess
  139. }
  140. // SetContext sets the session's default context.
  141. func (d *database) SetContext(ctx context.Context) {
  142. d.mu.Lock()
  143. d.ctx = ctx
  144. d.mu.Unlock()
  145. }
  146. // Context returns the session's default context.
  147. func (d *database) Context() context.Context {
  148. d.mu.Lock()
  149. defer d.mu.Unlock()
  150. if d.ctx == nil {
  151. return context.Background()
  152. }
  153. return d.ctx
  154. }
  155. // SetTxOptions sets the session's default TxOptions.
  156. func (d *database) SetTxOptions(txOptions sql.TxOptions) {
  157. d.mu.Lock()
  158. d.txOptions = &txOptions
  159. d.mu.Unlock()
  160. }
  161. // TxOptions returns the session's default TxOptions.
  162. func (d *database) TxOptions() *sql.TxOptions {
  163. d.mu.Lock()
  164. defer d.mu.Unlock()
  165. if d.txOptions == nil {
  166. return nil
  167. }
  168. return d.txOptions
  169. }
  170. // BindTx binds a *sql.Tx into *database
  171. func (d *database) BindTx(ctx context.Context, t *sql.Tx) error {
  172. d.sessMu.Lock()
  173. defer d.sessMu.Unlock()
  174. d.baseTx = newBaseTx(t)
  175. if err := d.Ping(); err != nil {
  176. return err
  177. }
  178. d.SetContext(ctx)
  179. d.txID = newBaseTxID()
  180. return nil
  181. }
  182. // Tx returns a BaseTx, which, if not nil, means that this session is within a
  183. // transaction
  184. func (d *database) Transaction() BaseTx {
  185. return d.baseTx
  186. }
  187. // Name returns the database named
  188. func (d *database) Name() string {
  189. d.lookupNameOnce.Do(func() {
  190. if d.name == "" {
  191. d.name, _ = d.PartialDatabase.LookupName()
  192. }
  193. })
  194. return d.name
  195. }
  196. // BindSession binds a *sql.DB into *database
  197. func (d *database) BindSession(sess *sql.DB) error {
  198. d.sessMu.Lock()
  199. d.sess = sess
  200. d.sessMu.Unlock()
  201. if err := d.Ping(); err != nil {
  202. return err
  203. }
  204. d.sessID = newSessionID()
  205. name, err := d.PartialDatabase.LookupName()
  206. if err != nil {
  207. return err
  208. }
  209. d.name = name
  210. return nil
  211. }
  212. // Ping checks whether a connection to the database is still alive by pinging
  213. // it
  214. func (d *database) Ping() error {
  215. if d.sess != nil {
  216. return d.sess.Ping()
  217. }
  218. return nil
  219. }
  220. // SetConnMaxLifetime sets the maximum amount of time a connection may be
  221. // reused.
  222. func (d *database) SetConnMaxLifetime(t time.Duration) {
  223. d.Settings.SetConnMaxLifetime(t)
  224. if sess := d.Session(); sess != nil {
  225. sess.SetConnMaxLifetime(d.Settings.ConnMaxLifetime())
  226. }
  227. }
  228. // SetMaxIdleConns sets the maximum number of connections in the idle
  229. // connection pool.
  230. func (d *database) SetMaxIdleConns(n int) {
  231. d.Settings.SetMaxIdleConns(n)
  232. if sess := d.Session(); sess != nil {
  233. sess.SetMaxIdleConns(d.MaxIdleConns())
  234. }
  235. }
  236. // SetMaxOpenConns sets the maximum number of open connections to the
  237. // database.
  238. func (d *database) SetMaxOpenConns(n int) {
  239. d.Settings.SetMaxOpenConns(n)
  240. if sess := d.Session(); sess != nil {
  241. sess.SetMaxOpenConns(d.MaxOpenConns())
  242. }
  243. }
  244. // ClearCache removes all caches.
  245. func (d *database) ClearCache() {
  246. d.cacheMu.Lock()
  247. defer d.cacheMu.Unlock()
  248. d.cachedCollections.Clear()
  249. d.cachedStatements.Clear()
  250. if d.template != nil {
  251. d.template.Cache.Clear()
  252. }
  253. }
  254. // NewClone binds a clone that is linked to the current
  255. // session. This is commonly done before creating a transaction
  256. // session.
  257. func (d *database) NewClone(p PartialDatabase, checkConn bool) (BaseDatabase, error) {
  258. nd := NewBaseDatabase(p).(*database)
  259. nd.name = d.name
  260. nd.sess = d.sess
  261. if checkConn {
  262. if err := nd.Ping(); err != nil {
  263. return nil, err
  264. }
  265. }
  266. nd.sessID = newSessionID()
  267. // New transaction should inherit parent settings
  268. copySettings(d, nd)
  269. return nd, nil
  270. }
  271. // Close terminates the current database session
  272. func (d *database) Close() error {
  273. defer func() {
  274. d.sessMu.Lock()
  275. d.sess = nil
  276. d.baseTx = nil
  277. d.sessMu.Unlock()
  278. }()
  279. if d.sess != nil {
  280. if cleaner, ok := d.PartialDatabase.(hasCleanUp); ok {
  281. cleaner.CleanUp()
  282. }
  283. d.cachedCollections.Clear()
  284. d.cachedStatements.Clear() // Closes prepared statements as well.
  285. tx := d.Transaction()
  286. if tx == nil {
  287. // Not within a transaction.
  288. return d.sess.Close()
  289. }
  290. if !tx.Committed() {
  291. tx.Rollback()
  292. return nil
  293. }
  294. }
  295. return nil
  296. }
  297. // Collection returns a db.Collection given a name. Results are cached.
  298. func (d *database) Collection(name string) db.Collection {
  299. d.cacheMu.Lock()
  300. defer d.cacheMu.Unlock()
  301. h := cache.String(name)
  302. ccol, ok := d.cachedCollections.ReadRaw(h)
  303. if ok {
  304. return ccol.(db.Collection)
  305. }
  306. col := d.PartialDatabase.NewCollection(name)
  307. d.cachedCollections.Write(h, col)
  308. return col
  309. }
  310. // StatementPrepare creates a prepared statement.
  311. func (d *database) StatementPrepare(ctx context.Context, stmt *exql.Statement) (sqlStmt *sql.Stmt, err error) {
  312. var query string
  313. if d.Settings.LoggingEnabled() {
  314. defer func(start time.Time) {
  315. d.Logger().Log(&db.QueryStatus{
  316. TxID: d.txID,
  317. SessID: d.sessID,
  318. Query: query,
  319. Err: err,
  320. Start: start,
  321. End: time.Now(),
  322. Context: ctx,
  323. })
  324. }(time.Now())
  325. }
  326. tx := d.Transaction()
  327. query, _ = d.compileStatement(stmt, nil)
  328. if tx != nil {
  329. sqlStmt, err = compat.PrepareContext(tx.(*baseTx), ctx, query)
  330. return
  331. }
  332. sqlStmt, err = compat.PrepareContext(d.sess, ctx, query)
  333. return
  334. }
  335. // ConvertValues converts native values into driver specific values.
  336. func (d *database) ConvertValues(values []interface{}) []interface{} {
  337. if converter, ok := d.PartialDatabase.(hasConvertValues); ok {
  338. return converter.ConvertValues(values)
  339. }
  340. return values
  341. }
  342. // StatementExec compiles and executes a statement that does not return any
  343. // rows.
  344. func (d *database) StatementExec(ctx context.Context, stmt *exql.Statement, args ...interface{}) (res sql.Result, err error) {
  345. var query string
  346. if d.Settings.LoggingEnabled() {
  347. defer func(start time.Time) {
  348. status := db.QueryStatus{
  349. TxID: d.txID,
  350. SessID: d.sessID,
  351. Query: query,
  352. Args: args,
  353. Err: err,
  354. Start: start,
  355. End: time.Now(),
  356. Context: ctx,
  357. }
  358. if res != nil {
  359. if rowsAffected, err := res.RowsAffected(); err == nil {
  360. status.RowsAffected = &rowsAffected
  361. }
  362. if lastInsertID, err := res.LastInsertId(); err == nil {
  363. status.LastInsertID = &lastInsertID
  364. }
  365. }
  366. d.Logger().Log(&status)
  367. }(time.Now())
  368. }
  369. if execer, ok := d.PartialDatabase.(hasStatementExec); ok {
  370. query, args = d.compileStatement(stmt, args)
  371. res, err = execer.StatementExec(ctx, query, args...)
  372. return
  373. }
  374. tx := d.Transaction()
  375. if d.Settings.PreparedStatementCacheEnabled() && tx == nil {
  376. var p *Stmt
  377. if p, query, args, err = d.prepareStatement(ctx, stmt, args); err != nil {
  378. return nil, err
  379. }
  380. defer p.Close()
  381. res, err = compat.PreparedExecContext(p, ctx, args)
  382. return
  383. }
  384. query, args = d.compileStatement(stmt, args)
  385. if tx != nil {
  386. res, err = compat.ExecContext(tx.(*baseTx), ctx, query, args)
  387. return
  388. }
  389. res, err = compat.ExecContext(d.sess, ctx, query, args)
  390. return
  391. }
  392. // StatementQuery compiles and executes a statement that returns rows.
  393. func (d *database) StatementQuery(ctx context.Context, stmt *exql.Statement, args ...interface{}) (rows *sql.Rows, err error) {
  394. var query string
  395. if d.Settings.LoggingEnabled() {
  396. defer func(start time.Time) {
  397. d.Logger().Log(&db.QueryStatus{
  398. TxID: d.txID,
  399. SessID: d.sessID,
  400. Query: query,
  401. Args: args,
  402. Err: err,
  403. Start: start,
  404. End: time.Now(),
  405. Context: ctx,
  406. })
  407. }(time.Now())
  408. }
  409. tx := d.Transaction()
  410. if d.Settings.PreparedStatementCacheEnabled() && tx == nil {
  411. var p *Stmt
  412. if p, query, args, err = d.prepareStatement(ctx, stmt, args); err != nil {
  413. return nil, err
  414. }
  415. defer p.Close()
  416. rows, err = compat.PreparedQueryContext(p, ctx, args)
  417. return
  418. }
  419. query, args = d.compileStatement(stmt, args)
  420. if tx != nil {
  421. rows, err = compat.QueryContext(tx.(*baseTx), ctx, query, args)
  422. return
  423. }
  424. rows, err = compat.QueryContext(d.sess, ctx, query, args)
  425. return
  426. }
  427. // StatementQueryRow compiles and executes a statement that returns at most one
  428. // row.
  429. func (d *database) StatementQueryRow(ctx context.Context, stmt *exql.Statement, args ...interface{}) (row *sql.Row, err error) {
  430. var query string
  431. if d.Settings.LoggingEnabled() {
  432. defer func(start time.Time) {
  433. d.Logger().Log(&db.QueryStatus{
  434. TxID: d.txID,
  435. SessID: d.sessID,
  436. Query: query,
  437. Args: args,
  438. Err: err,
  439. Start: start,
  440. End: time.Now(),
  441. Context: ctx,
  442. })
  443. }(time.Now())
  444. }
  445. tx := d.Transaction()
  446. if d.Settings.PreparedStatementCacheEnabled() && tx == nil {
  447. var p *Stmt
  448. if p, query, args, err = d.prepareStatement(ctx, stmt, args); err != nil {
  449. return nil, err
  450. }
  451. defer p.Close()
  452. row = compat.PreparedQueryRowContext(p, ctx, args)
  453. return
  454. }
  455. query, args = d.compileStatement(stmt, args)
  456. if tx != nil {
  457. row = compat.QueryRowContext(tx.(*baseTx), ctx, query, args)
  458. return
  459. }
  460. row = compat.QueryRowContext(d.sess, ctx, query, args)
  461. return
  462. }
  463. // Driver returns the underlying *sql.DB or *sql.Tx instance.
  464. func (d *database) Driver() interface{} {
  465. if tx := d.Transaction(); tx != nil {
  466. // A transaction
  467. return tx.(*baseTx).Tx
  468. }
  469. return d.sess
  470. }
  471. // compileStatement compiles the given statement into a string.
  472. func (d *database) compileStatement(stmt *exql.Statement, args []interface{}) (string, []interface{}) {
  473. if converter, ok := d.PartialDatabase.(hasConvertValues); ok {
  474. args = converter.ConvertValues(args)
  475. }
  476. return d.PartialDatabase.CompileStatement(stmt, args)
  477. }
  478. // prepareStatement compiles a query and tries to use previously generated
  479. // statement.
  480. func (d *database) prepareStatement(ctx context.Context, stmt *exql.Statement, args []interface{}) (*Stmt, string, []interface{}, error) {
  481. d.sessMu.Lock()
  482. defer d.sessMu.Unlock()
  483. sess, tx := d.sess, d.Transaction()
  484. if sess == nil && tx == nil {
  485. return nil, "", nil, db.ErrNotConnected
  486. }
  487. pc, ok := d.cachedStatements.ReadRaw(stmt)
  488. if ok {
  489. // The statement was cached.
  490. ps, err := pc.(*Stmt).Open()
  491. if err == nil {
  492. _, args = d.compileStatement(stmt, args)
  493. return ps, ps.query, args, nil
  494. }
  495. }
  496. query, args := d.compileStatement(stmt, args)
  497. sqlStmt, err := func(query *string) (*sql.Stmt, error) {
  498. if tx != nil {
  499. return compat.PrepareContext(tx.(*baseTx), ctx, *query)
  500. }
  501. return compat.PrepareContext(sess, ctx, *query)
  502. }(&query)
  503. if err != nil {
  504. return nil, "", nil, err
  505. }
  506. p, err := NewStatement(sqlStmt, query).Open()
  507. if err != nil {
  508. return nil, query, args, err
  509. }
  510. d.cachedStatements.Write(stmt, p)
  511. return p, p.query, args, nil
  512. }
  513. var waitForConnMu sync.Mutex
  514. // WaitForConnection tries to execute the given connectFn function, if
  515. // connectFn returns an error, then WaitForConnection will keep trying until
  516. // connectFn returns nil. Maximum waiting time is 5s after having acquired the
  517. // lock.
  518. func (d *database) WaitForConnection(connectFn func() error) error {
  519. // This lock ensures first-come, first-served and prevents opening too many
  520. // file descriptors.
  521. waitForConnMu.Lock()
  522. defer waitForConnMu.Unlock()
  523. // Minimum waiting time.
  524. waitTime := time.Millisecond * 10
  525. // Waitig 5 seconds for a successful connection.
  526. for timeStart := time.Now(); time.Now().Sub(timeStart) < time.Second*5; {
  527. err := connectFn()
  528. if err == nil {
  529. return nil // Connected!
  530. }
  531. // Only attempt to reconnect if the error is too many clients.
  532. if d.PartialDatabase.Err(err) == db.ErrTooManyClients {
  533. // Sleep and try again if, and only if, the server replied with a "too
  534. // many clients" error.
  535. time.Sleep(waitTime)
  536. if waitTime < time.Millisecond*500 {
  537. // Wait a bit more next time.
  538. waitTime = waitTime * 2
  539. }
  540. continue
  541. }
  542. // Return any other error immediately.
  543. return err
  544. }
  545. return db.ErrGivingUpTryingToConnect
  546. }
  547. // ReplaceWithDollarSign turns a SQL statament with '?' placeholders into
  548. // dollar placeholders, like $1, $2, ..., $n
  549. func ReplaceWithDollarSign(in string) string {
  550. buf := []byte(in)
  551. out := make([]byte, 0, len(buf))
  552. i, j, k, t := 0, 1, 0, len(buf)
  553. for i < t {
  554. if buf[i] == '?' {
  555. out = append(out, buf[k:i]...)
  556. k = i + 1
  557. if k < t && buf[k] == '?' {
  558. i = k
  559. } else {
  560. out = append(out, []byte("$"+strconv.Itoa(j))...)
  561. j++
  562. }
  563. }
  564. i++
  565. }
  566. out = append(out, buf[k:i]...)
  567. return string(out)
  568. }
  569. func copySettings(from BaseDatabase, into BaseDatabase) {
  570. into.SetLogging(from.LoggingEnabled())
  571. into.SetLogger(from.Logger())
  572. into.SetPreparedStatementCache(from.PreparedStatementCacheEnabled())
  573. into.SetConnMaxLifetime(from.ConnMaxLifetime())
  574. into.SetMaxIdleConns(from.MaxIdleConns())
  575. into.SetMaxOpenConns(from.MaxOpenConns())
  576. txOptions := from.TxOptions()
  577. if txOptions != nil {
  578. into.SetTxOptions(*txOptions)
  579. }
  580. }
  581. func newSessionID() uint64 {
  582. if atomic.LoadUint64(&lastSessID) == math.MaxUint64 {
  583. atomic.StoreUint64(&lastSessID, 0)
  584. return 0
  585. }
  586. return atomic.AddUint64(&lastSessID, 1)
  587. }
  588. func newBaseTxID() uint64 {
  589. if atomic.LoadUint64(&lastTxID) == math.MaxUint64 {
  590. atomic.StoreUint64(&lastTxID, 0)
  591. return 0
  592. }
  593. return atomic.AddUint64(&lastTxID, 1)
  594. }