diff options
author | Ben Morrison <ben@gbmor.dev> | 2019-05-21 04:13:21 -0400 |
---|---|---|
committer | Ben Morrison <ben@gbmor.dev> | 2019-05-21 04:13:21 -0400 |
commit | c896e6b85ff32c3c0c45336de8b75752a31574ca (patch) | |
tree | f9bbacbf49949e4b86adf676b33decbddec40fef | |
parent | bd23ef0959496aba4c6fc8ca2b3969bbf17aa9d5 (diff) | |
download | getwtxt-c896e6b85ff32c3c0c45336de8b75752a31574ca.tar.gz |
database push/pull functions
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | cache.go | 159 | ||||
-rw-r--r-- | init.go | 32 | ||||
-rw-r--r-- | post.go | 6 |
4 files changed, 190 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore index 71d2264..bd018f3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ getwtxt *.log local/ +*.db/ diff --git a/cache.go b/cache.go new file mode 100644 index 0000000..253de98 --- /dev/null +++ b/cache.go @@ -0,0 +1,159 @@ +package main + +import ( + "log" + "reflect" + "strings" + "time" + + "github.com/getwtxt/registry" + "github.com/syndtr/goleveldb/leveldb" +) + +// checks if it's time to refresh the cache or not +func checkCacheTime() bool { + return time.Since(confObj.lastCache) > confObj.cacheInterval +} + +// checks if it's time to push the cache to the database +func checkDBtime() bool { + return time.Since(confObj.lastPush) > confObj.dbInterval +} + +// launched by init as a goroutine to constantly watch +// for the update interval to pass +func cacheAndPush() { + for { + if checkCacheTime() { + refreshCache() + } + if checkDBtime() { + if err := pushDatabase(); err != nil { + log.Printf("Error pushing cache to database: %v\n", err) + } + } + } +} + +// refreshes the cache +func refreshCache() { + + for k := range twtxtCache.Reg { + err := twtxtCache.UpdateUser(k) + if err != nil { + log.Printf("%v\n", err) + continue + } + } + + for _, v := range remoteRegistries.List { + err := twtxtCache.ScrapeRemoteRegistry(v) + if err != nil { + log.Printf("Error while refreshing local copy of remote registry user data: %v\n", err) + } + } + confObj.mu.Lock() + confObj.lastCache = time.Now() + confObj.mu.Unlock() +} + +// pushes the registry's cache data to a local +// database for safe keeping +func pushDatabase() error { + db := <-dbChan + twtxtCache.Mu.RLock() + + // create a batch write job so it can + // be done at one time rather than + // per value + var dbBasket *leveldb.Batch + for k, v := range twtxtCache.Reg { + dbBasket.Put([]byte(k+".Nick"), []byte(v.Nick)) + dbBasket.Put([]byte(k+".URL"), []byte(v.URL)) + dbBasket.Put([]byte(k+".IP"), []byte(v.IP)) + dbBasket.Put([]byte(k+".Date"), []byte(v.Date)) + for i, e := range v.Status { + dbBasket.Put([]byte(k+".Status."+i.String()), []byte(e)) + } + } + + // save our list of remote registries to scrape + for k, v := range remoteRegistries.List { + dbBasket.Put([]byte("remote."+string(k)), []byte(v)) + } + + // execute the batch job + if err := db.Write(dbBasket, nil); err != nil { + return err + } + + twtxtCache.Mu.RUnlock() + dbChan <- db + + // update the last push time + confObj.mu.Lock() + confObj.lastPush = time.Now() + confObj.mu.Unlock() + + return nil +} + +// pulls registry data from the DB on startup +func pullDatabase() { + db := <-dbChan + + iter := db.NewIterator(nil, nil) + + for iter.Next() { + key := iter.Key() + val := iter.Value() + + split := strings.Split(string(key), ".") + urls := string(split[0]) + field := string(split[1]) + data := registry.NewUserData() + + twtxtCache.Mu.RLock() + if _, ok := twtxtCache.Reg[urls]; ok { + data = twtxtCache.Reg[urls] + } + twtxtCache.Mu.RUnlock() + + ref := reflect.ValueOf(data).Elem() + + if field != "Status" && urls != "remote" { + for i := 0; i < ref.NumField(); i++ { + + f := ref.Field(i) + if f.String() == field { + f.Set(reflect.ValueOf(val)) + break + } + } + } else if field == "Status" && urls != "remote" { + + thetime, err := time.Parse("RFC3339", split[2]) + if err != nil { + log.Printf("%v\n", err) + } + data.Status[thetime] = string(val) + + } else { + remoteRegistries.Mu.Lock() + remoteRegistries.List = append(remoteRegistries.List, string(val)) + remoteRegistries.Mu.Unlock() + } + + twtxtCache.Mu.Lock() + twtxtCache.Reg[urls] = data + twtxtCache.Mu.Unlock() + } + + iter.Release() + err := iter.Error() + if err != nil { + log.Printf("Error while pulling DB into registry cache: %v\n", err) + } + + dbChan <- db +} diff --git a/init.go b/init.go index 47304c5..6896a1a 100644 --- a/init.go +++ b/init.go @@ -33,6 +33,11 @@ var closelog = make(chan bool, 1) // initialization var dbChan = make(chan *leveldb.DB, 1) +// provides access to the database so it +// can be closed outside of the init function's +// scope. +var dbCloseChan = make(chan *leveldb.DB, 1) + // templates var tmpls *template.Template @@ -40,7 +45,7 @@ var tmpls *template.Template var twtxtCache = registry.NewIndex() // remote registry listing -var remote = &RemoteRegistries{} +var remoteRegistries = &RemoteRegistries{} func init() { checkFlags() @@ -48,6 +53,8 @@ func init() { initConfig() initLogging() tmpls = initTemplates() + initDatabase() + go cacheAndPush() watchForInterrupt() } @@ -107,6 +114,8 @@ func initConfig() { dbDur, _ = time.ParseDuration("5m") } + thetime := time.Now() + confObj.mu.Lock() confObj.port = viper.GetInt("port") confObj.logFile = viper.GetString("logFile") @@ -114,7 +123,8 @@ func initConfig() { confObj.stdoutLogging = viper.GetBool("stdoutLogging") confObj.cacheInterval = dur confObj.dbInterval = dbDur - confObj.lastCache = time.Now() + confObj.lastCache = thetime + confObj.lastPush = thetime confObj.version = getwtxt confObj.Instance.Name = viper.GetString("instance.name") confObj.Instance.URL = viper.GetString("instance.url") @@ -216,6 +226,9 @@ func initDatabase() { } dbChan <- db + dbCloseChan <- db + + pullDatabase() } // Watch for SIGINT aka ^C @@ -226,19 +239,28 @@ func watchForInterrupt() { go func() { for sigint := range c { - log.Printf("\n\nCaught %v. Cleaning up ...\n", sigint) + log.Printf("\n\nCaught %v. Cleaning up ...\n", sigint) confObj.mu.RLock() + + // Close the database cleanly + log.Printf("Closing database connection to %v...\n", confObj.dbPath) + db := <-dbCloseChan + if err := db.Close(); err != nil { + log.Printf("%v\n", err) + } + if !confObj.stdoutLogging { // signal to close the log file closelog <- true } - confObj.mu.RUnlock() + confObj.mu.RUnlock() + close(dbCloseChan) close(closelog) // Let everything catch up - time.Sleep(30 * time.Millisecond) + time.Sleep(100 * time.Millisecond) os.Exit(0) } }() diff --git a/post.go b/post.go index 5091a0e..e829538 100644 --- a/post.go +++ b/post.go @@ -33,9 +33,9 @@ func apiPostUser(w http.ResponseWriter, r *http.Request) { } if remoteRegistry { - remote.Mu.Lock() - remote.List = append(remote.List, urls) - remote.Mu.Unlock() + remoteRegistries.Mu.Lock() + remoteRegistries.List = append(remoteRegistries.List, urls) + remoteRegistries.Mu.Unlock() err := twtxtCache.ScrapeRemoteRegistry(urls) if err != nil { |