about summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorBen Morrison <ben@gbmor.dev>2019-05-21 04:13:21 -0400
committerBen Morrison <ben@gbmor.dev>2019-05-21 04:13:21 -0400
commitc896e6b85ff32c3c0c45336de8b75752a31574ca (patch)
treef9bbacbf49949e4b86adf676b33decbddec40fef
parentbd23ef0959496aba4c6fc8ca2b3969bbf17aa9d5 (diff)
downloadgetwtxt-c896e6b85ff32c3c0c45336de8b75752a31574ca.tar.gz
database push/pull functions
-rw-r--r--.gitignore1
-rw-r--r--cache.go159
-rw-r--r--init.go32
-rw-r--r--post.go6
4 files changed, 190 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore
index 71d2264..bd018f3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,4 @@
 getwtxt
 *.log
 local/
+*.db/
diff --git a/cache.go b/cache.go
new file mode 100644
index 0000000..253de98
--- /dev/null
+++ b/cache.go
@@ -0,0 +1,159 @@
+package main
+
+import (
+	"log"
+	"reflect"
+	"strings"
+	"time"
+
+	"github.com/getwtxt/registry"
+	"github.com/syndtr/goleveldb/leveldb"
+)
+
+// checks if it's time to refresh the cache or not
+func checkCacheTime() bool {
+	return time.Since(confObj.lastCache) > confObj.cacheInterval
+}
+
+// checks if it's time to push the cache to the database
+func checkDBtime() bool {
+	return time.Since(confObj.lastPush) > confObj.dbInterval
+}
+
+// launched by init as a goroutine to constantly watch
+// for the update interval to pass
+func cacheAndPush() {
+	for {
+		if checkCacheTime() {
+			refreshCache()
+		}
+		if checkDBtime() {
+			if err := pushDatabase(); err != nil {
+				log.Printf("Error pushing cache to database: %v\n", err)
+			}
+		}
+	}
+}
+
+// refreshes the cache
+func refreshCache() {
+
+	for k := range twtxtCache.Reg {
+		err := twtxtCache.UpdateUser(k)
+		if err != nil {
+			log.Printf("%v\n", err)
+			continue
+		}
+	}
+
+	for _, v := range remoteRegistries.List {
+		err := twtxtCache.ScrapeRemoteRegistry(v)
+		if err != nil {
+			log.Printf("Error while refreshing local copy of remote registry user data: %v\n", err)
+		}
+	}
+	confObj.mu.Lock()
+	confObj.lastCache = time.Now()
+	confObj.mu.Unlock()
+}
+
+// pushes the registry's cache data to a local
+// database for safe keeping
+func pushDatabase() error {
+	db := <-dbChan
+	twtxtCache.Mu.RLock()
+
+	// create a batch write job so it can
+	// be done at one time rather than
+	// per value
+	var dbBasket *leveldb.Batch
+	for k, v := range twtxtCache.Reg {
+		dbBasket.Put([]byte(k+".Nick"), []byte(v.Nick))
+		dbBasket.Put([]byte(k+".URL"), []byte(v.URL))
+		dbBasket.Put([]byte(k+".IP"), []byte(v.IP))
+		dbBasket.Put([]byte(k+".Date"), []byte(v.Date))
+		for i, e := range v.Status {
+			dbBasket.Put([]byte(k+".Status."+i.String()), []byte(e))
+		}
+	}
+
+	// save our list of remote registries to scrape
+	for k, v := range remoteRegistries.List {
+		dbBasket.Put([]byte("remote."+string(k)), []byte(v))
+	}
+
+	// execute the batch job
+	if err := db.Write(dbBasket, nil); err != nil {
+		return err
+	}
+
+	twtxtCache.Mu.RUnlock()
+	dbChan <- db
+
+	// update the last push time
+	confObj.mu.Lock()
+	confObj.lastPush = time.Now()
+	confObj.mu.Unlock()
+
+	return nil
+}
+
+// pulls registry data from the DB on startup
+func pullDatabase() {
+	db := <-dbChan
+
+	iter := db.NewIterator(nil, nil)
+
+	for iter.Next() {
+		key := iter.Key()
+		val := iter.Value()
+
+		split := strings.Split(string(key), ".")
+		urls := string(split[0])
+		field := string(split[1])
+		data := registry.NewUserData()
+
+		twtxtCache.Mu.RLock()
+		if _, ok := twtxtCache.Reg[urls]; ok {
+			data = twtxtCache.Reg[urls]
+		}
+		twtxtCache.Mu.RUnlock()
+
+		ref := reflect.ValueOf(data).Elem()
+
+		if field != "Status" && urls != "remote" {
+			for i := 0; i < ref.NumField(); i++ {
+
+				f := ref.Field(i)
+				if f.String() == field {
+					f.Set(reflect.ValueOf(val))
+					break
+				}
+			}
+		} else if field == "Status" && urls != "remote" {
+
+			thetime, err := time.Parse("RFC3339", split[2])
+			if err != nil {
+				log.Printf("%v\n", err)
+			}
+			data.Status[thetime] = string(val)
+
+		} else {
+			remoteRegistries.Mu.Lock()
+			remoteRegistries.List = append(remoteRegistries.List, string(val))
+			remoteRegistries.Mu.Unlock()
+		}
+
+		twtxtCache.Mu.Lock()
+		twtxtCache.Reg[urls] = data
+		twtxtCache.Mu.Unlock()
+	}
+
+	iter.Release()
+	err := iter.Error()
+	if err != nil {
+		log.Printf("Error while pulling DB into registry cache: %v\n", err)
+	}
+
+	dbChan <- db
+}
diff --git a/init.go b/init.go
index 47304c5..6896a1a 100644
--- a/init.go
+++ b/init.go
@@ -33,6 +33,11 @@ var closelog = make(chan bool, 1)
 // initialization
 var dbChan = make(chan *leveldb.DB, 1)
 
+// provides access to the database so it
+// can be closed outside of the init function's
+// scope.
+var dbCloseChan = make(chan *leveldb.DB, 1)
+
 // templates
 var tmpls *template.Template
 
@@ -40,7 +45,7 @@ var tmpls *template.Template
 var twtxtCache = registry.NewIndex()
 
 // remote registry listing
-var remote = &RemoteRegistries{}
+var remoteRegistries = &RemoteRegistries{}
 
 func init() {
 	checkFlags()
@@ -48,6 +53,8 @@ func init() {
 	initConfig()
 	initLogging()
 	tmpls = initTemplates()
+	initDatabase()
+	go cacheAndPush()
 	watchForInterrupt()
 }
 
@@ -107,6 +114,8 @@ func initConfig() {
 		dbDur, _ = time.ParseDuration("5m")
 	}
 
+	thetime := time.Now()
+
 	confObj.mu.Lock()
 	confObj.port = viper.GetInt("port")
 	confObj.logFile = viper.GetString("logFile")
@@ -114,7 +123,8 @@ func initConfig() {
 	confObj.stdoutLogging = viper.GetBool("stdoutLogging")
 	confObj.cacheInterval = dur
 	confObj.dbInterval = dbDur
-	confObj.lastCache = time.Now()
+	confObj.lastCache = thetime
+	confObj.lastPush = thetime
 	confObj.version = getwtxt
 	confObj.Instance.Name = viper.GetString("instance.name")
 	confObj.Instance.URL = viper.GetString("instance.url")
@@ -216,6 +226,9 @@ func initDatabase() {
 	}
 
 	dbChan <- db
+	dbCloseChan <- db
+
+	pullDatabase()
 }
 
 // Watch for SIGINT aka ^C
@@ -226,19 +239,28 @@ func watchForInterrupt() {
 
 	go func() {
 		for sigint := range c {
-			log.Printf("\n\nCaught %v. Cleaning up ...\n", sigint)
 
+			log.Printf("\n\nCaught %v. Cleaning up ...\n", sigint)
 			confObj.mu.RLock()
+
+			// Close the database cleanly
+			log.Printf("Closing database connection to %v...\n", confObj.dbPath)
+			db := <-dbCloseChan
+			if err := db.Close(); err != nil {
+				log.Printf("%v\n", err)
+			}
+
 			if !confObj.stdoutLogging {
 				// signal to close the log file
 				closelog <- true
 			}
-			confObj.mu.RUnlock()
 
+			confObj.mu.RUnlock()
+			close(dbCloseChan)
 			close(closelog)
 
 			// Let everything catch up
-			time.Sleep(30 * time.Millisecond)
+			time.Sleep(100 * time.Millisecond)
 			os.Exit(0)
 		}
 	}()
diff --git a/post.go b/post.go
index 5091a0e..e829538 100644
--- a/post.go
+++ b/post.go
@@ -33,9 +33,9 @@ func apiPostUser(w http.ResponseWriter, r *http.Request) {
 	}
 
 	if remoteRegistry {
-		remote.Mu.Lock()
-		remote.List = append(remote.List, urls)
-		remote.Mu.Unlock()
+		remoteRegistries.Mu.Lock()
+		remoteRegistries.List = append(remoteRegistries.List, urls)
+		remoteRegistries.Mu.Unlock()
 
 		err := twtxtCache.ScrapeRemoteRegistry(urls)
 		if err != nil {