diff options
Diffstat (limited to 'svc/db.go')
-rw-r--r-- | svc/db.go | 149 |
1 files changed, 115 insertions, 34 deletions
diff --git a/svc/db.go b/svc/db.go index 6a614f1..36c682b 100644 --- a/svc/db.go +++ b/svc/db.go @@ -21,47 +21,30 @@ type dbLevel struct { } type dbSqlite struct { - db *sql.DB -} - -type dbPostgres struct { - db *sql.DB + db *sql.DB + pullStmt *sql.Stmt + pushStmt *sql.Stmt } // Pull DB data into cache, if available. func initDatabase() { var db dbase - var err error confObj.Mu.RLock() switch confObj.DBType { case "leveldb": - var lvl *leveldb.DB - lvl, err = leveldb.OpenFile(confObj.DBPath, nil) + lvl, err := leveldb.OpenFile(confObj.DBPath, nil) + errFatal("", err) db = &dbLevel{db: lvl} case "sqlite": - var lite *sql.DB - lite, err := sql.Open("sqlite3", confObj.DBPath) - errFatal("Error opening sqlite3 DB: ", err) - litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT PRIMARY KEY, isUser BOOL, blobKey TEXT, data BLOB)") - errFatal("Error preparing sqlite3 DB: ", err) - _, err = litePrep.Exec() - errFatal("Error creating sqlite3 DB: ", err) - db = &dbSqlite{db: lite} - - case "postgres": - var pg *sql.DB - db = &dbPostgres{db: pg} + db = initSqlite() } confObj.Mu.RUnlock() - errFatal("", err) - dbChan <- db - pullDB() } @@ -73,6 +56,30 @@ func dbTimer() bool { return answer } +func initSqlite() *dbSqlite { + + lite, err := sql.Open("sqlite3", confObj.DBPath) + errFatal("Error opening sqlite3 DB: ", err) + + litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)") + errFatal("Error preparing sqlite3 DB: ", err) + + _, err = litePrep.Exec() + errFatal("Error creating sqlite3 DB: ", err) + + push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt(urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)") + errFatal("", err) + + pull, err := lite.Prepare("SELECT * FROM getwtxt") + errFatal("", err) + + return &dbSqlite{ + db: lite, + pushStmt: push, + pullStmt: pull, + } +} + // Pushes the registry's cache data to a local // database for safe keeping. func pushDB() error { @@ -90,6 +97,7 @@ func pullDB() { } func (lvl dbLevel) push() error { + twtxtCache.Mu.RLock() var dbBasket = &leveldb.Batch{} for k, v := range twtxtCache.Users { @@ -117,9 +125,7 @@ func (lvl dbLevel) push() error { confObj.LastPush = time.Now() confObj.Mu.Unlock() - err := lvl.db.Write(dbBasket, nil) - - return err + return lvl.db.Write(dbBasket, nil) } func (lvl dbLevel) pull() { @@ -144,10 +150,13 @@ func (lvl dbLevel) pull() { data := registry.NewUser() twtxtCache.Mu.RLock() if _, ok := twtxtCache.Users[urls]; ok { + twtxtCache.Users[urls].Mu.RLock() data = twtxtCache.Users[urls] + twtxtCache.Users[urls].Mu.RUnlock() } twtxtCache.Mu.RUnlock() + data.Mu.Lock() switch field { case "IP": data.IP = net.ParseIP(val) @@ -164,11 +173,11 @@ func (lvl dbLevel) pull() { errLog("", err) data.Status[thetime] = val } + data.Mu.Unlock() twtxtCache.Mu.Lock() twtxtCache.Users[urls] = data twtxtCache.Mu.Unlock() - } remoteRegistries.Mu.Lock() @@ -176,24 +185,96 @@ func (lvl dbLevel) pull() { remoteRegistries.Mu.Unlock() iter.Release() - err := iter.Error() - errLog("Error while pulling DB into registry cache: ", err) + errLog("Error while pulling DB into registry cache: ", iter.Error()) } func (lite dbSqlite) push() error { + err := lite.db.Ping() + if err != nil { + return err + } + + twtxtCache.Mu.RLock() + for i, e := range twtxtCache.Users { + e.Mu.RLock() + + _, err = lite.pushStmt.Exec(i, true, "nickname", e.Nick) + errLog("", err) + _, err = lite.pushStmt.Exec(i, true, "rlen", e.RLen) + errLog("", err) + _, err = lite.pushStmt.Exec(i, true, "uip", e.IP) + errLog("", err) + _, err = lite.pushStmt.Exec(i, true, "date", e.Date) + errLog("", err) + + for k, v := range e.Status { + _, err = lite.pushStmt.Exec(i, true, k.Format(time.RFC3339), v) + errLog("", err) + } + + e.Mu.RUnlock() + } + twtxtCache.Mu.RUnlock() + + remoteRegistries.Mu.RLock() + for _, e := range remoteRegistries.List { + _, err = lite.pushStmt.Exec(e, false, "REMOTE REGISTRY", "NULL") + errLog("", err) + } + remoteRegistries.Mu.RUnlock() return nil } func (lite dbSqlite) pull() { + errLog("Error pinging sqlite DB: ", lite.db.Ping()) -} + rows, err := lite.pullStmt.Query() + errLog("", err) -func (pg dbPostgres) push() error { + twtxtCache.Mu.Lock() + for rows.Next() { + var urls string + var isUser bool + var dataKey string + var dBlob []byte - return nil -} + errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob)) + + if !isUser { + remoteRegistries.Mu.Lock() + remoteRegistries.List = append(remoteRegistries.List, urls) + remoteRegistries.Mu.Unlock() + continue + } + + user := registry.NewUser() + if _, ok := twtxtCache.Users[urls]; ok { + user = twtxtCache.Users[urls] + } + user.Mu.Lock() + + switch dataKey { + case "nickname": + user.Nick = string(dBlob) + case "uip": + user.IP = net.ParseIP(string(dBlob)) + case "date": + user.Date = string(dBlob) + case "rlen": + user.RLen = string(dBlob) + default: + thetime, err := time.Parse(time.RFC3339, dataKey) + errLog("While pulling statuses from SQLite: ", err) + user.Status[thetime] = string(dBlob) + } -func (pg dbPostgres) pull() { + twtxtCache.Users[urls] = user + user.Mu.Unlock() + } + twtxtCache.Mu.Unlock() + remoteRegistries.Mu.Lock() + remoteRegistries.List = dedupe(remoteRegistries.List) + remoteRegistries.Mu.Unlock() } |