diff options
Diffstat (limited to 'svc')
-rw-r--r-- | svc/conf.go | 24 | ||||
-rw-r--r-- | svc/db.go | 229 | ||||
-rw-r--r-- | svc/init.go | 4 | ||||
-rw-r--r-- | svc/leveldb.go | 106 | ||||
-rw-r--r-- | svc/sqlite.go | 148 |
5 files changed, 273 insertions, 238 deletions
diff --git a/svc/conf.go b/svc/conf.go index 0aaee14..9099477 100644 --- a/svc/conf.go +++ b/svc/conf.go @@ -135,22 +135,19 @@ func initConfig() { confObj.Port = viper.GetInt("ListenPort") confObj.LogFile = viper.GetString("LogFile") - if *flagDBType == "" { - confObj.DBType = strings.ToLower(viper.GetString("DatabaseType")) - } else { + confObj.DBType = strings.ToLower(viper.GetString("DatabaseType")) + if *flagDBType != "" { confObj.DBType = *flagDBType } - if *flagDBPath == "" { - confObj.DBPath = viper.GetString("DatabasePath") - } else { + confObj.DBPath = viper.GetString("DatabasePath") + if *flagDBPath != "" { confObj.DBPath = *flagDBPath } log.Printf("Using %v database: %v\n", confObj.DBType, confObj.DBPath) - if *flagAssets == "" { - confObj.AssetsDir = viper.GetString("AssetsDirectory") - } else { + confObj.AssetsDir = viper.GetString("AssetsDirectory") + if *flagAssets != "" { confObj.AssetsDir = *flagAssets } @@ -192,9 +189,16 @@ func rebindConfig() { confObj.Mu.Lock() - confObj.LogFile = viper.GetString("LogFile") confObj.DBType = strings.ToLower(viper.GetString("DatabaseType")) + if *flagDBType != "" { + confObj.DBType = *flagDBType + } + + confObj.LogFile = viper.GetString("LogFile") confObj.DBPath = viper.GetString("DatabasePath") + if *flagDBPath != "" { + confObj.DBPath = *flagDBPath + } confObj.StdoutLogging = viper.GetBool("StdoutLogging") confObj.CacheInterval = viper.GetDuration("StatusFetchInterval") confObj.DBInterval = viper.GetDuration("DatabasePushInterval") diff --git a/svc/db.go b/svc/db.go index 36c682b..ce20ed0 100644 --- a/svc/db.go +++ b/svc/db.go @@ -1,13 +1,8 @@ package svc // import "github.com/getwtxt/getwtxt/svc" import ( - "database/sql" - "net" - "strings" "time" - "github.com/getwtxt/registry" - _ "github.com/mattn/go-sqlite3" // for the sqlite3 driver "github.com/syndtr/goleveldb/leveldb" ) @@ -16,25 +11,17 @@ type dbase interface { pull() } -type dbLevel struct { - db *leveldb.DB -} - -type dbSqlite struct { - db *sql.DB - pullStmt *sql.Stmt - pushStmt *sql.Stmt -} - // Pull DB data into cache, if available. func initDatabase() { var db dbase - confObj.Mu.RLock() + dbpath := confObj.DBPath + confObj.Mu.RUnlock() + switch confObj.DBType { case "leveldb": - lvl, err := leveldb.OpenFile(confObj.DBPath, nil) + lvl, err := leveldb.OpenFile(dbpath, nil) errFatal("", err) db = &dbLevel{db: lvl} @@ -42,7 +29,6 @@ func initDatabase() { db = initSqlite() } - confObj.Mu.RUnlock() dbChan <- db pullDB() @@ -56,30 +42,6 @@ func dbTimer() bool { return answer } -func initSqlite() *dbSqlite { - - lite, err := sql.Open("sqlite3", confObj.DBPath) - errFatal("Error opening sqlite3 DB: ", err) - - litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)") - errFatal("Error preparing sqlite3 DB: ", err) - - _, err = litePrep.Exec() - errFatal("Error creating sqlite3 DB: ", err) - - push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt(urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)") - errFatal("", err) - - pull, err := lite.Prepare("SELECT * FROM getwtxt") - errFatal("", err) - - return &dbSqlite{ - db: lite, - pushStmt: push, - pullStmt: pull, - } -} - // Pushes the registry's cache data to a local // database for safe keeping. func pushDB() error { @@ -95,186 +57,3 @@ func pullDB() { db.pull() dbChan <- db } - -func (lvl dbLevel) push() error { - - twtxtCache.Mu.RLock() - var dbBasket = &leveldb.Batch{} - for k, v := range twtxtCache.Users { - - dbBasket.Put([]byte(k+"*Nick"), []byte(v.Nick)) - dbBasket.Put([]byte(k+"*URL"), []byte(v.URL)) - dbBasket.Put([]byte(k+"*IP"), []byte(v.IP.String())) - dbBasket.Put([]byte(k+"*Date"), []byte(v.Date)) - dbBasket.Put([]byte(k+"*RLen"), []byte(v.RLen)) - - for i, e := range v.Status { - rfc := i.Format(time.RFC3339) - dbBasket.Put([]byte(k+"*Status*"+rfc), []byte(e)) - } - } - twtxtCache.Mu.RUnlock() - - remoteRegistries.Mu.RLock() - for k, v := range remoteRegistries.List { - dbBasket.Put([]byte("remote*"+string(k)), []byte(v)) - } - remoteRegistries.Mu.RUnlock() - - confObj.Mu.Lock() - confObj.LastPush = time.Now() - confObj.Mu.Unlock() - - return lvl.db.Write(dbBasket, nil) -} - -func (lvl dbLevel) pull() { - - iter := lvl.db.NewIterator(nil, nil) - - for iter.Next() { - key := string(iter.Key()) - val := string(iter.Value()) - - split := strings.Split(key, "*") - urls := split[0] - field := split[1] - - if urls == "remote" { - remoteRegistries.Mu.Lock() - remoteRegistries.List = append(remoteRegistries.List, val) - remoteRegistries.Mu.Unlock() - continue - } - - data := registry.NewUser() - twtxtCache.Mu.RLock() - if _, ok := twtxtCache.Users[urls]; ok { - twtxtCache.Users[urls].Mu.RLock() - data = twtxtCache.Users[urls] - twtxtCache.Users[urls].Mu.RUnlock() - } - twtxtCache.Mu.RUnlock() - - data.Mu.Lock() - switch field { - case "IP": - data.IP = net.ParseIP(val) - case "Nick": - data.Nick = val - case "URL": - data.URL = val - case "RLen": - data.RLen = val - case "Date": - data.Date = val - case "Status": - thetime, err := time.Parse(time.RFC3339, split[2]) - errLog("", err) - data.Status[thetime] = val - } - data.Mu.Unlock() - - twtxtCache.Mu.Lock() - twtxtCache.Users[urls] = data - twtxtCache.Mu.Unlock() - } - - remoteRegistries.Mu.Lock() - remoteRegistries.List = dedupe(remoteRegistries.List) - remoteRegistries.Mu.Unlock() - - iter.Release() - errLog("Error while pulling DB into registry cache: ", iter.Error()) -} - -func (lite dbSqlite) push() error { - err := lite.db.Ping() - if err != nil { - return err - } - - twtxtCache.Mu.RLock() - for i, e := range twtxtCache.Users { - e.Mu.RLock() - - _, err = lite.pushStmt.Exec(i, true, "nickname", e.Nick) - errLog("", err) - _, err = lite.pushStmt.Exec(i, true, "rlen", e.RLen) - errLog("", err) - _, err = lite.pushStmt.Exec(i, true, "uip", e.IP) - errLog("", err) - _, err = lite.pushStmt.Exec(i, true, "date", e.Date) - errLog("", err) - - for k, v := range e.Status { - _, err = lite.pushStmt.Exec(i, true, k.Format(time.RFC3339), v) - errLog("", err) - } - - e.Mu.RUnlock() - } - twtxtCache.Mu.RUnlock() - - remoteRegistries.Mu.RLock() - for _, e := range remoteRegistries.List { - _, err = lite.pushStmt.Exec(e, false, "REMOTE REGISTRY", "NULL") - errLog("", err) - } - remoteRegistries.Mu.RUnlock() - - return nil -} - -func (lite dbSqlite) pull() { - errLog("Error pinging sqlite DB: ", lite.db.Ping()) - - rows, err := lite.pullStmt.Query() - errLog("", err) - - twtxtCache.Mu.Lock() - for rows.Next() { - var urls string - var isUser bool - var dataKey string - var dBlob []byte - - errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob)) - - if !isUser { - remoteRegistries.Mu.Lock() - remoteRegistries.List = append(remoteRegistries.List, urls) - remoteRegistries.Mu.Unlock() - continue - } - - user := registry.NewUser() - if _, ok := twtxtCache.Users[urls]; ok { - user = twtxtCache.Users[urls] - } - user.Mu.Lock() - - switch dataKey { - case "nickname": - user.Nick = string(dBlob) - case "uip": - user.IP = net.ParseIP(string(dBlob)) - case "date": - user.Date = string(dBlob) - case "rlen": - user.RLen = string(dBlob) - default: - thetime, err := time.Parse(time.RFC3339, dataKey) - errLog("While pulling statuses from SQLite: ", err) - user.Status[thetime] = string(dBlob) - } - - twtxtCache.Users[urls] = user - user.Mu.Unlock() - } - twtxtCache.Mu.Unlock() - - remoteRegistries.Mu.Lock() - remoteRegistries.List = dedupe(remoteRegistries.List) - remoteRegistries.Mu.Unlock() -} diff --git a/svc/init.go b/svc/init.go index 7db5303..b688e5d 100644 --- a/svc/init.go +++ b/svc/init.go @@ -1,7 +1,6 @@ package svc // import "github.com/getwtxt/getwtxt/svc" import ( - "html/template" "log" "os" "os/signal" @@ -32,7 +31,7 @@ var closeLog = make(chan bool, 1) // initialization var dbChan = make(chan dbase, 1) -var tmpls *template.Template +var tmpls = initTemplates() var twtxtCache = registry.NewIndex() @@ -60,7 +59,6 @@ func initSvc() { titleScreen() initConfig() initLogging() - tmpls = initTemplates() initDatabase() go cacheAndPush() watchForInterrupt() diff --git a/svc/leveldb.go b/svc/leveldb.go new file mode 100644 index 0000000..455d8b9 --- /dev/null +++ b/svc/leveldb.go @@ -0,0 +1,106 @@ +package svc // import "github.com/getwtxt/getwtxt/svc" + +import ( + "net" + "strings" + "time" + + "github.com/getwtxt/registry" + "github.com/syndtr/goleveldb/leveldb" +) + +type dbLevel struct { + db *leveldb.DB +} + +func (lvl dbLevel) push() error { + + twtxtCache.Mu.RLock() + var dbBasket = &leveldb.Batch{} + for k, v := range twtxtCache.Users { + + dbBasket.Put([]byte(k+"*Nick"), []byte(v.Nick)) + dbBasket.Put([]byte(k+"*URL"), []byte(v.URL)) + dbBasket.Put([]byte(k+"*IP"), []byte(v.IP.String())) + dbBasket.Put([]byte(k+"*Date"), []byte(v.Date)) + dbBasket.Put([]byte(k+"*RLen"), []byte(v.RLen)) + + for i, e := range v.Status { + rfc := i.Format(time.RFC3339) + dbBasket.Put([]byte(k+"*Status*"+rfc), []byte(e)) + } + } + twtxtCache.Mu.RUnlock() + + remoteRegistries.Mu.RLock() + for k, v := range remoteRegistries.List { + dbBasket.Put([]byte("remote*"+string(k)), []byte(v)) + } + remoteRegistries.Mu.RUnlock() + + confObj.Mu.Lock() + confObj.LastPush = time.Now() + confObj.Mu.Unlock() + + return lvl.db.Write(dbBasket, nil) +} + +func (lvl dbLevel) pull() { + + iter := lvl.db.NewIterator(nil, nil) + + for iter.Next() { + key := string(iter.Key()) + val := string(iter.Value()) + + split := strings.Split(key, "*") + urls := split[0] + field := split[1] + + if urls == "remote" { + remoteRegistries.Mu.Lock() + remoteRegistries.List = append(remoteRegistries.List, val) + remoteRegistries.Mu.Unlock() + continue + } + + data := registry.NewUser() + twtxtCache.Mu.RLock() + if _, ok := twtxtCache.Users[urls]; ok { + twtxtCache.Users[urls].Mu.RLock() + data = twtxtCache.Users[urls] + twtxtCache.Users[urls].Mu.RUnlock() + } + twtxtCache.Mu.RUnlock() + + data.Mu.Lock() + switch field { + case "IP": + data.IP = net.ParseIP(val) + case "Nick": + data.Nick = val + case "URL": + data.URL = val + case "RLen": + data.RLen = val + case "Date": + data.Date = val + case "Status": + thetime, err := time.Parse(time.RFC3339, split[2]) + errLog("", err) + data.Status[thetime] = val + } + data.Mu.Unlock() + + twtxtCache.Mu.Lock() + twtxtCache.Users[urls] = data + twtxtCache.Mu.Unlock() + } + + remoteRegistries.Mu.Lock() + remoteRegistries.List = dedupe(remoteRegistries.List) + remoteRegistries.Mu.Unlock() + + iter.Release() + errLog("Error while pulling DB into registry cache: ", iter.Error()) +} diff --git a/svc/sqlite.go b/svc/sqlite.go new file mode 100644 index 0000000..3f50402 --- /dev/null +++ b/svc/sqlite.go @@ -0,0 +1,148 @@ +package svc // import "github.com/getwtxt/getwtxt/svc" + +import ( + "database/sql" + "net" + "time" + + "github.com/getwtxt/registry" + _ "github.com/mattn/go-sqlite3" // for the sqlite3 driver +) + +type dbSqlite struct { + db *sql.DB + pullStmt *sql.Stmt + pushStmt *sql.Stmt +} + +func initSqlite() *dbSqlite { + + confObj.Mu.RLock() + dbpath := confObj.DBPath + confObj.Mu.RUnlock() + + lite, err := sql.Open("sqlite3", dbpath) + errFatal("Error opening sqlite3 DB: ", err) + + errFatal("", lite.Ping()) + + _, err = lite.Exec("CREATE TABLE IF NOT EXISTS getwtxt (id INTEGER PRIMARY KEY, urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)") + errFatal("Error preparing sqlite3 DB: ", err) + + push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt (urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)") + errFatal("", err) + + pull, err := lite.Prepare("SELECT * FROM getwtxt") + errFatal("", err) + + return &dbSqlite{ + db: lite, + pushStmt: push, + pullStmt: pull, + } +} + +func (lite dbSqlite) push() error { + err := lite.db.Ping() + if err != nil { + return err + } + + tx, err := lite.db.Begin() + errLog("", err) + txst := tx.Stmt(lite.pushStmt) + + twtxtCache.Mu.RLock() + for i, e := range twtxtCache.Users { + e.Mu.RLock() + + _, err = txst.Exec(i, true, "nickname", e.Nick) + errLog("", err) + _, err = txst.Exec(i, true, "rlen", e.RLen) + errLog("", err) + _, err = txst.Exec(i, true, "uip", e.IP) + errLog("", err) + _, err = txst.Exec(i, true, "date", e.Date) + errLog("", err) + + for k, v := range e.Status { + _, err = txst.Exec(i, true, k.Format(time.RFC3339), v) + errLog("", err) + } + + e.Mu.RUnlock() + } + twtxtCache.Mu.RUnlock() + + remoteRegistries.Mu.RLock() + for _, e := range remoteRegistries.List { + _, err = txst.Exec(e, false, "REMOTE REGISTRY", "NULL") + errLog("", err) + } + remoteRegistries.Mu.RUnlock() + + err = tx.Commit() + if err != nil { + tx.Rollback() + return err + } + + return nil +} + +func (lite dbSqlite) pull() { + errLog("Error pinging sqlite DB: ", lite.db.Ping()) + + rows, err := lite.pullStmt.Query() + errLog("", err) + + defer func(rows *sql.Rows) { + errLog("Error while finalizing DB Pull: ", rows.Close()) + }(rows) + + twtxtCache.Mu.Lock() + for rows.Next() { + var urls string + var isUser bool + var dataKey string + var dBlob []byte + + errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob)) + + if !isUser { + remoteRegistries.Mu.Lock() + remoteRegistries.List = append(remoteRegistries.List, urls) + remoteRegistries.Mu.Unlock() + continue + } + + user := registry.NewUser() + if _, ok := twtxtCache.Users[urls]; ok { + user = twtxtCache.Users[urls] + } + user.Mu.Lock() + + switch dataKey { + case "nickname": + user.Nick = string(dBlob) + case "uip": + user.IP = net.ParseIP(string(dBlob)) + case "date": + user.Date = string(dBlob) + case "rlen": + user.RLen = string(dBlob) + default: + thetime, err := time.Parse(time.RFC3339, dataKey) + errLog("While pulling statuses from SQLite: ", err) + user.Status[thetime] = string(dBlob) + } + + twtxtCache.Users[urls] = user + user.Mu.Unlock() + } + twtxtCache.Mu.Unlock() + + remoteRegistries.Mu.Lock() + remoteRegistries.List = dedupe(remoteRegistries.List) + remoteRegistries.Mu.Unlock() +} |