package svc // import "github.com/getwtxt/getwtxt/svc"
import (
"database/sql"
"net"
"strings"
"time"
"github.com/getwtxt/registry"
_ "github.com/mattn/go-sqlite3" // for the sqlite3 driver
"github.com/syndtr/goleveldb/leveldb"
)
type dbase interface {
push() error
pull()
}
type dbLevel struct {
db *leveldb.DB
}
type dbSqlite struct {
db *sql.DB
pullStmt *sql.Stmt
pushStmt *sql.Stmt
}
// Pull DB data into cache, if available.
func initDatabase() {
var db dbase
confObj.Mu.RLock()
switch confObj.DBType {
case "leveldb":
lvl, err := leveldb.OpenFile(confObj.DBPath, nil)
errFatal("", err)
db = &dbLevel{db: lvl}
case "sqlite":
db = initSqlite()
}
confObj.Mu.RUnlock()
dbChan <- db
pullDB()
}
func dbTimer() bool {
confObj.Mu.RLock()
answer := time.Since(confObj.LastPush) > confObj.DBInterval
confObj.Mu.RUnlock()
return answer
}
func initSqlite() *dbSqlite {
lite, err := sql.Open("sqlite3", confObj.DBPath)
errFatal("Error opening sqlite3 DB: ", err)
litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)")
errFatal("Error preparing sqlite3 DB: ", err)
_, err = litePrep.Exec()
errFatal("Error creating sqlite3 DB: ", err)
push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt(urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)")
errFatal("", err)
pull, err := lite.Prepare("SELECT * FROM getwtxt")
errFatal("", err)
return &dbSqlite{
db: lite,
pushStmt: push,
pullStmt: pull,
}
}
// Pushes the registry's cache data to a local
// database for safe keeping.
func pushDB() error {
db := <-dbChan
err := db.push()
dbChan <- db
return err
}
func pullDB() {
db := <-dbChan
db.pull()
dbChan <- db
}
func (lvl dbLevel) push() error {
twtxtCache.Mu.RLock()
var dbBasket = &leveldb.Batch{}
for k, v := range twtxtCache.Users {
dbBasket.Put([]byte(k+"*Nick"), []byte(v.Nick))
dbBasket.Put([]byte(k+"*URL"), []byte(v.URL))
dbBasket.Put([]byte(k+"*IP"), []byte(v.IP.String()))
dbBasket.Put([]byte(k+"*Date"), []byte(v.Date))
dbBasket.Put([]byte(k+"*RLen"), []byte(v.RLen))
for i, e := range v.Status {
rfc := i.Format(time.RFC3339)
dbBasket.Put([]byte(k+"*Status*"+rfc), []byte(e))
}
}
twtxtCache.Mu.RUnlock()
remoteRegistries.Mu.RLock()
for k, v := range remoteRegistries.List {
dbBasket.Put([]byte("remote*"+string(k)), []byte(v))
}
remoteRegistries.Mu.RUnlock()
confObj.Mu.Lock()
confObj.LastPush = time.Now()
confObj.Mu.Unlock()
return lvl.db.Write(dbBasket, nil)
}
func (lvl dbLevel) pull() {
iter := lvl.db.NewIterator(nil, nil)
for iter.Next() {
key := string(iter.Key())
val := string(iter.Value())
split := strings.Split(key, "*")
urls := split[0]
field := split[1]
if urls == "remote" {
remoteRegistries.Mu.Lock()
remoteRegistries.List = append(remoteRegistries.List, val)
remoteRegistries.Mu.Unlock()
continue
}
data := registry.NewUser()
twtxtCache.Mu.RLock()
if _, ok := twtxtCache.Users[urls]; ok {
twtxtCache.Users[urls].Mu.RLock()
data = twtxtCache.Users[urls]
twtxtCache.Users[urls].Mu.RUnlock()
}
twtxtCache.Mu.RUnlock()
data.Mu.Lock()
switch field {
case "IP":
data.IP = net.ParseIP(val)
case "Nick":
data.Nick = val
case "URL":
data.URL = val
case "RLen":
data.RLen = val
case "Date":
data.Date = val
case "Status":
thetime, err := time.Parse(time.RFC3339, split[2])
errLog("", err)
data.Status[thetime] = val
}
data.Mu.Unlock()
twtxtCache.Mu.Lock()
twtxtCache.Users[urls] = data
twtxtCache.Mu.Unlock()
}
remoteRegistries.Mu.Lock()
remoteRegistries.List = dedupe(remoteRegistries.List)
remoteRegistries.Mu.Unlock()
iter.Release()
errLog("Error while pulling DB into registry cache: ", iter.Error())
}
func (lite dbSqlite) push() error {
err := lite.db.Ping()
if err != nil {
return err
}
twtxtCache.Mu.RLock()
for i, e := range twtxtCache.Users {
e.Mu.RLock()
_, err = lite.pushStmt.Exec(i, true, "nickname", e.Nick)
errLog("", err)
_, err = lite.pushStmt.Exec(i, true, "rlen", e.RLen)
errLog("", err)
_, err = lite.pushStmt.Exec(i, true, "uip", e.IP)
errLog("", err)
_, err = lite.pushStmt.Exec(i, true, "date", e.Date)
errLog("", err)
for k, v := range e.Status {
_, err = lite.pushStmt.Exec(i, true, k.Format(time.RFC3339), v)
errLog("", err)
}
e.Mu.RUnlock()
}
twtxtCache.Mu.RUnlock()
remoteRegistries.Mu.RLock()
for _, e := range remoteRegistries.List {
_, err = lite.pushStmt.Exec(e, false, "REMOTE REGISTRY", "NULL")
errLog("", err)
}
remoteRegistries.Mu.RUnlock()
return nil
}
func (lite dbSqlite) pull() {
errLog("Error pinging sqlite DB: ", lite.db.Ping())
rows, err := lite.pullStmt.Query()
errLog("", err)
twtxtCache.Mu.Lock()
for rows.Next() {
var urls string
var isUser bool
var dataKey string
var dBlob []byte
errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob))
if !isUser {
remoteRegistries.Mu.Lock()
remoteRegistries.List = append(remoteRegistries.List, urls)
remoteRegistries.Mu.Unlock()
continue
}
user := registry.NewUser()
if _, ok := twtxtCache.Users[urls]; ok {
user = twtxtCache.Users[urls]
}
user.Mu.Lock()
switch dataKey {
case "nickname":
user.Nick = string(dBlob)
case "uip":
user.IP = net.ParseIP(string(dBlob))
case "date":
user.Date = string(dBlob)
case "rlen":
user.RLen = string(dBlob)
default:
thetime, err := time.Parse(time.RFC3339, dataKey)
errLog("While pulling statuses from SQLite: ", err)
user.Status[thetime] = string(dBlob)
}
twtxtCache.Users[urls] = user
user.Mu.Unlock()
}
twtxtCache.Mu.Unlock()
remoteRegistries.Mu.Lock()
remoteRegistries.List = dedupe(remoteRegistries.List)
remoteRegistries.Mu.Unlock()
}