summary refs log blame commit diff stats
path: root/svc/db.go
blob: 36c682b23c7936a63d3029917cffd89603ad8b52 (plain) (tree)
1
2
3
4
5
6
7
8
9
                                                      

        
                      




                                     
                                                                 


                                             




                      




                      


                          

 


                                         




                               

                                                                 


                                      
                                 
 


                            
                    
                

 
                     






                                                                   























                                                                                                                               

                                              
                     
                      
                        

                    
                  

 
               
                      
                 
                    


                                 
 


                                            
 



                                                                    

                                                               












                                                                         



                                     
                                          























                                                                                  
                                                         
                                                     
                                                           


                                       
                              






                                                  

                                       



                                                                          
                                       

                                                  
                                



                                             






                                                             
                                                                            


                                   
































                                                                                       




                             
                                                           
 

                                          
 





                                  
 




























                                                                                   
 



                                             
 


                                                             
 
package svc // import "github.com/getwtxt/getwtxt/svc"

import (
	"database/sql"
	"net"
	"strings"
	"time"

	"github.com/getwtxt/registry"
	_ "github.com/mattn/go-sqlite3" // for the sqlite3 driver
	"github.com/syndtr/goleveldb/leveldb"
)

type dbase interface {
	push() error
	pull()
}

type dbLevel struct {
	db *leveldb.DB
}

type dbSqlite struct {
	db       *sql.DB
	pullStmt *sql.Stmt
	pushStmt *sql.Stmt
}

// Pull DB data into cache, if available.
func initDatabase() {
	var db dbase

	confObj.Mu.RLock()
	switch confObj.DBType {

	case "leveldb":
		lvl, err := leveldb.OpenFile(confObj.DBPath, nil)
		errFatal("", err)
		db = &dbLevel{db: lvl}

	case "sqlite":
		db = initSqlite()

	}
	confObj.Mu.RUnlock()

	dbChan <- db
	pullDB()
}

func dbTimer() bool {
	confObj.Mu.RLock()
	answer := time.Since(confObj.LastPush) > confObj.DBInterval
	confObj.Mu.RUnlock()

	return answer
}

func initSqlite() *dbSqlite {

	lite, err := sql.Open("sqlite3", confObj.DBPath)
	errFatal("Error opening sqlite3 DB: ", err)

	litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)")
	errFatal("Error preparing sqlite3 DB: ", err)

	_, err = litePrep.Exec()
	errFatal("Error creating sqlite3 DB: ", err)

	push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt(urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)")
	errFatal("", err)

	pull, err := lite.Prepare("SELECT * FROM getwtxt")
	errFatal("", err)

	return &dbSqlite{
		db:       lite,
		pushStmt: push,
		pullStmt: pull,
	}
}

// Pushes the registry's cache data to a local
// database for safe keeping.
func pushDB() error {
	db := <-dbChan
	err := db.push()
	dbChan <- db

	return err
}

func pullDB() {
	db := <-dbChan
	db.pull()
	dbChan <- db
}

func (lvl dbLevel) push() error {

	twtxtCache.Mu.RLock()
	var dbBasket = &leveldb.Batch{}
	for k, v := range twtxtCache.Users {

		dbBasket.Put([]byte(k+"*Nick"), []byte(v.Nick))
		dbBasket.Put([]byte(k+"*URL"), []byte(v.URL))
		dbBasket.Put([]byte(k+"*IP"), []byte(v.IP.String()))
		dbBasket.Put([]byte(k+"*Date"), []byte(v.Date))
		dbBasket.Put([]byte(k+"*RLen"), []byte(v.RLen))

		for i, e := range v.Status {
			rfc := i.Format(time.RFC3339)
			dbBasket.Put([]byte(k+"*Status*"+rfc), []byte(e))
		}
	}
	twtxtCache.Mu.RUnlock()

	remoteRegistries.Mu.RLock()
	for k, v := range remoteRegistries.List {
		dbBasket.Put([]byte("remote*"+string(k)), []byte(v))
	}
	remoteRegistries.Mu.RUnlock()

	confObj.Mu.Lock()
	confObj.LastPush = time.Now()
	confObj.Mu.Unlock()

	return lvl.db.Write(dbBasket, nil)
}

func (lvl dbLevel) pull() {

	iter := lvl.db.NewIterator(nil, nil)

	for iter.Next() {
		key := string(iter.Key())
		val := string(iter.Value())

		split := strings.Split(key, "*")
		urls := split[0]
		field := split[1]

		if urls == "remote" {
			remoteRegistries.Mu.Lock()
			remoteRegistries.List = append(remoteRegistries.List, val)
			remoteRegistries.Mu.Unlock()
			continue
		}

		data := registry.NewUser()
		twtxtCache.Mu.RLock()
		if _, ok := twtxtCache.Users[urls]; ok {
			twtxtCache.Users[urls].Mu.RLock()
			data = twtxtCache.Users[urls]
			twtxtCache.Users[urls].Mu.RUnlock()
		}
		twtxtCache.Mu.RUnlock()

		data.Mu.Lock()
		switch field {
		case "IP":
			data.IP = net.ParseIP(val)
		case "Nick":
			data.Nick = val
		case "URL":
			data.URL = val
		case "RLen":
			data.RLen = val
		case "Date":
			data.Date = val
		case "Status":
			thetime, err := time.Parse(time.RFC3339, split[2])
			errLog("", err)
			data.Status[thetime] = val
		}
		data.Mu.Unlock()

		twtxtCache.Mu.Lock()
		twtxtCache.Users[urls] = data
		twtxtCache.Mu.Unlock()
	}

	remoteRegistries.Mu.Lock()
	remoteRegistries.List = dedupe(remoteRegistries.List)
	remoteRegistries.Mu.Unlock()

	iter.Release()
	errLog("Error while pulling DB into registry cache: ", iter.Error())
}

func (lite dbSqlite) push() error {
	err := lite.db.Ping()
	if err != nil {
		return err
	}

	twtxtCache.Mu.RLock()
	for i, e := range twtxtCache.Users {
		e.Mu.RLock()

		_, err = lite.pushStmt.Exec(i, true, "nickname", e.Nick)
		errLog("", err)
		_, err = lite.pushStmt.Exec(i, true, "rlen", e.RLen)
		errLog("", err)
		_, err = lite.pushStmt.Exec(i, true, "uip", e.IP)
		errLog("", err)
		_, err = lite.pushStmt.Exec(i, true, "date", e.Date)
		errLog("", err)

		for k, v := range e.Status {
			_, err = lite.pushStmt.Exec(i, true, k.Format(time.RFC3339), v)
			errLog("", err)
		}

		e.Mu.RUnlock()
	}
	twtxtCache.Mu.RUnlock()

	remoteRegistries.Mu.RLock()
	for _, e := range remoteRegistries.List {
		_, err = lite.pushStmt.Exec(e, false, "REMOTE REGISTRY", "NULL")
		errLog("", err)
	}
	remoteRegistries.Mu.RUnlock()

	return nil
}

func (lite dbSqlite) pull() {
	errLog("Error pinging sqlite DB: ", lite.db.Ping())

	rows, err := lite.pullStmt.Query()
	errLog("", err)

	twtxtCache.Mu.Lock()
	for rows.Next() {
		var urls string
		var isUser bool
		var dataKey string
		var dBlob []byte

		errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob))

		if !isUser {
			remoteRegistries.Mu.Lock()
			remoteRegistries.List = append(remoteRegistries.List, urls)
			remoteRegistries.Mu.Unlock()
			continue
		}

		user := registry.NewUser()
		if _, ok := twtxtCache.Users[urls]; ok {
			user = twtxtCache.Users[urls]
		}
		user.Mu.Lock()

		switch dataKey {
		case "nickname":
			user.Nick = string(dBlob)
		case "uip":
			user.IP = net.ParseIP(string(dBlob))
		case "date":
			user.Date = string(dBlob)
		case "rlen":
			user.RLen = string(dBlob)
		default:
			thetime, err := time.Parse(time.RFC3339, dataKey)
			errLog("While pulling statuses from SQLite: ", err)
			user.Status[thetime] = string(dBlob)
		}

		twtxtCache.Users[urls] = user
		user.Mu.Unlock()
	}
	twtxtCache.Mu.Unlock()

	remoteRegistries.Mu.Lock()
	remoteRegistries.List = dedupe(remoteRegistries.List)
	remoteRegistries.Mu.Unlock()
}