diff options
author | Ben Morrison <ben@gbmor.dev> | 2019-06-11 03:58:43 -0400 |
---|---|---|
committer | Ben Morrison <ben@gbmor.dev> | 2019-06-11 04:54:50 -0400 |
commit | fdd9bd02f9ec37d25104dce8c1279a007aa980fc (patch) | |
tree | 96d5ba15eb81ca84d7b36a9406f50ac4b4042613 | |
parent | 1d9cef515a16602b7dc26b2fe06032b5e731b48e (diff) | |
download | getwtxt-fdd9bd02f9ec37d25104dce8c1279a007aa980fc.tar.gz |
cache and db now use time.Ticker for events. refactored some initialization.
-rw-r--r-- | svc/cache.go | 40 | ||||
-rw-r--r-- | svc/cache_test.go | 54 | ||||
-rw-r--r-- | svc/conf.go | 104 | ||||
-rw-r--r-- | svc/db.go | 34 | ||||
-rw-r--r-- | svc/init.go | 46 |
5 files changed, 170 insertions, 108 deletions
diff --git a/svc/cache.go b/svc/cache.go index 20c3953..3383016 100644 --- a/svc/cache.go +++ b/svc/cache.go @@ -9,6 +9,13 @@ import ( "time" ) +// These functions and types pertain to the +// in-memory data being used by the registry +// service, such as: +// - static assets (index.html, style.css) +// - the registry itself (users, etc) +// - list of other registries submitted + // RemoteRegistries holds a list of remote registries to // periodically scrape for new users. The remote registries // must have been added via POST like a user. @@ -17,6 +24,10 @@ type RemoteRegistries struct { List []string } +// staticAssets holda the rendered landing page +// as a byte slice, its on-disk mod time, the +// assets/style.css file as a byte slice, and +// its on-disk mod time. type staticAssets struct { mu sync.RWMutex index []byte @@ -25,6 +36,9 @@ type staticAssets struct { cssMod time.Time } +// Renders the landing page template using +// the info supplied in the configuration +// file's "Instance" section. func initTemplates() *template.Template { confObj.Mu.RLock() assetsDir := confObj.AssetsDir @@ -33,30 +47,7 @@ func initTemplates() *template.Template { return template.Must(template.ParseFiles(assetsDir + "/tmpl/index.html")) } -func cacheTimer() bool { - confObj.Mu.RLock() - answer := time.Since(confObj.LastCache) > confObj.CacheInterval - confObj.Mu.RUnlock() - - return answer -} - -// Launched by init as a coroutine to watch -// for the update intervals to pass. -func cacheAndPush() { - for { - if cacheTimer() { - refreshCache() - } - if dbTimer() { - errLog("Error pushing cache to database: ", pushDB()) - } - time.Sleep(1000 * time.Millisecond) - } -} - -func refreshCache() { - +func cacheUpdate() { // This clusterfuck of mutex read locks is // necessary to avoid deadlock. This mess // also avoids a panic that would occur @@ -84,7 +75,6 @@ func refreshCache() { // need to be re-cached. If they do, they are // pulled back into memory from disk. func pingAssets() { - confObj.Mu.RLock() assetsDir := confObj.AssetsDir confObj.Mu.RUnlock() diff --git a/svc/cache_test.go b/svc/cache_test.go index 29b92b8..28e5c0b 100644 --- a/svc/cache_test.go +++ b/svc/cache_test.go @@ -2,76 +2,32 @@ package svc // import "github.com/getwtxt/getwtxt/svc" import ( "testing" - "time" ) -func Test_cacheTimer(t *testing.T) { - initTestConf() - dur, _ := time.ParseDuration("5m") - back30, _ := time.ParseDuration("-30m") - - cases := []struct { - name string - lastCache time.Time - interval time.Duration - expect bool - }{ - { - name: "Past Interval", - lastCache: time.Now().Add(back30), - interval: dur, - expect: true, - }, - { - name: "Before Interval", - lastCache: time.Now(), - interval: dur, - expect: false, - }, - } - - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - - confObj.Mu.Lock() - confObj.LastCache = tt.lastCache - confObj.CacheInterval = tt.interval - confObj.Mu.Unlock() - - res := cacheTimer() - - if res != tt.expect { - t.Errorf("Got %v, expected %v\n", res, tt.expect) - } - }) - } - -} - -func Test_refreshCache(t *testing.T) { +func Test_cacheUpdate(t *testing.T) { initTestConf() confObj.Mu.RLock() prevtime := confObj.LastCache confObj.Mu.RUnlock() t.Run("Cache Time Check", func(t *testing.T) { - refreshCache() + cacheUpdate() confObj.Mu.RLock() newtime := confObj.LastCache confObj.Mu.RUnlock() if !newtime.After(prevtime) || newtime == prevtime { - t.Errorf("Cache time did not update, check refreshCache() logic\n") + t.Errorf("Cache time did not update, check cacheUpdate() logic\n") } }) } -func Benchmark_refreshCache(b *testing.B) { +func Benchmark_cacheUpdate(b *testing.B) { initTestConf() b.ResetTimer() for i := 0; i < b.N; i++ { - refreshCache() + cacheUpdate() } } diff --git a/svc/conf.go b/svc/conf.go index 0d9e739..ccd819c 100644 --- a/svc/conf.go +++ b/svc/conf.go @@ -12,7 +12,8 @@ import ( "github.com/spf13/viper" ) -// Configuration object definition +// Configuration values are held in an instance of +// this struct. type Configuration struct { Mu sync.RWMutex Port int `yaml:"ListenPort"` @@ -28,7 +29,8 @@ type Configuration struct { Instance `yaml:"Instance"` } -// Instance refers to this specific instance of getwtxt +// Instance refers to meta data about +// this specific instance of getwtxt type Instance struct { Vers string `yaml:"-"` Name string `yaml:"Instance.SiteName"` @@ -38,29 +40,104 @@ type Instance struct { Desc string `yaml:"Instance.Description"` } +// This is a wrapper for a *time.Ticker +// that adds another channel. It's used +// to signal to the ticker goroutines +// that they should stop the tickers +// and exit. +type tick struct { + isDB bool + t *time.Ticker + exit chan bool +} + +// Creates a new instance of a tick +func initTicker(db bool, interval time.Duration) *tick { + return &tick{ + isDB: db, + t: time.NewTicker(interval), + exit: make(chan bool, 1), + } +} + +// Sends the signal to stop the tickers +// and for their respective goroutines +// to exit. +func killTickers() { + ct := <-cTickC + dt := <-dbTickC + ct.exit <- true + dt.exit <- true +} + +// Waits for a signal from the database +// *tick. Either stops the ticker and +// kills the goroutine or it will +// update cache / push the DB to disk +func dataTimer(tkr *tick) { + for { + select { + case signal := <-tkr.t.C: + if tkr.isDB { + errLog("", pushDB()) + log.Printf("Database push took: %v\n", time.Since(signal)) + continue + } + cacheUpdate() + log.Printf("Cache update took: %v\n", time.Since(signal)) + case <-tkr.exit: + tkr.t.Stop() + return + } + } +} + +// Called on start-up. Initializes everything +// related to configuration values. func initConfig() { + log.Printf("Loading configuration ...\n") parseConfigFlag() - setConfigDefaults() - log.Printf("Loading configuration ...\n") if err := viper.ReadInConfig(); err != nil { log.Printf("%v\n", err.Error()) log.Printf("Using defaults ...\n") bindConfig() return } - viper.WatchConfig() - viper.OnConfigChange(func(e fsnotify.Event) { - log.Printf("Config file change detected. Reloading...\n") - bindConfig() - initLogging() - }) + viper.OnConfigChange(reInit) + + bindConfig() +} + +// Called when a change is detected in the +// configuration file. Closes log file, +// closes database connection, stops all +// tickers, then binds new configuration +// values, opens new log file, connects to +// new database, and starts new cache and +// database tickers. +func reInit(e fsnotify.Event) { + log.Printf("%v. Reloading...\n", e.String()) + + if !confObj.StdoutLogging { + closeLog <- true + } + + killTickers() + killDB() + bindConfig() + + initLogging() + initDatabase() + initPersistence() } +// Registers either stdout or a specified file +// to the default logger. func initLogging() { confObj.Mu.RLock() @@ -92,6 +169,8 @@ func initLogging() { confObj.Mu.RUnlock() } +// Default values should a config file +// not be available. func setConfigDefaults() { viper.SetDefault("ListenPort", 9001) viper.SetDefault("LogFile", "getwtxt.log") @@ -109,6 +188,8 @@ func setConfigDefaults() { viper.SetDefault("Instance.Description", "A fast, resilient twtxt registry server written in Go!") } +// Reads data from the configuration +// flag and acts accordingly. func parseConfigFlag() { if *flagConfFile == "" { viper.SetConfigName("getwtxt") @@ -130,6 +211,9 @@ func parseConfigFlag() { } } +// Simply goes down the list of fields +// in the confObj instance of &Configuration{}, +// assigning values from the config file. func bindConfig() { confObj.Mu.Lock() diff --git a/svc/db.go b/svc/db.go index 3841d83..260c5cc 100644 --- a/svc/db.go +++ b/svc/db.go @@ -1,18 +1,26 @@ package svc // import "github.com/getwtxt/getwtxt/svc" import ( - "time" - "github.com/syndtr/goleveldb/leveldb" "golang.org/x/sys/unix" ) +// Everything in this file is database-agnostic. +// Functions and types related to specific kinds +// of databases will be in their own respective +// files, such as: +// - leveldb.go +// - sqlite.go + +// Abstraction to allow several different +// databases to be used interchangeably. type dbase interface { push() error pull() } -// Pull DB data into cache, if available. +// Opens a new connection to the specified +// database, then reads it into memory. func initDatabase() { var db dbase confObj.Mu.RLock() @@ -35,16 +43,19 @@ func initDatabase() { pullDB() } -func dbTimer() bool { - confObj.Mu.RLock() - answer := time.Since(confObj.LastPush) > confObj.DBInterval - confObj.Mu.RUnlock() - - return answer +// Close the database connection. +func killDB() { + db := <-dbChan + switch dbType := db.(type) { + case *dbLevel: + errLog("", dbType.db.Close()) + case *dbSqlite: + errLog("", dbType.db.Close()) + } } -// Pushes the registry's cache data to a local -// database for safe keeping. +// Pushes the registry's cache data +// to a local database for safe keeping. func pushDB() error { db := <-dbChan err := db.push() @@ -55,6 +66,7 @@ func pushDB() error { return err } +// Reads the database from disk into memory. func pullDB() { db := <-dbChan db.pull() diff --git a/svc/init.go b/svc/init.go index e63ffdd..71527cf 100644 --- a/svc/init.go +++ b/svc/init.go @@ -24,24 +24,32 @@ var ( flagDBType *string = pflag.StringP("dbtype", "t", "", "Type of database being used") ) +// Holds the global configuration var confObj = &Configuration{} -// signals to close the log file +// Signals to close the log file var closeLog = make(chan bool, 1) -// used to transmit database pointer after -// initialization +// Used to transmit database pointer, database ticker, +// and cache ticker after initialization var dbChan = make(chan dbase, 1) +var dbTickC = make(chan *tick, 1) +var cTickC = make(chan *tick, 1) +// Used to manage the landing page template var tmpls *template.Template +// Holds the registry data in-memory var twtxtCache = registry.NewIndex() +// List of other registries submitted to this registry var remoteRegistries = &RemoteRegistries{ Mu: sync.RWMutex{}, List: make([]string, 0), } +// In-memory cache of static assets, specifically +// the parsed landing page and the stylesheet. var staticCache = &staticAssets{} func errFatal(context string, err error) { @@ -62,13 +70,15 @@ func errLog(context string, err error) { func initSvc() { checkFlags() titleScreen() + initConfig() initLogging() initDatabase() - go cacheAndPush() tmpls = initTemplates() - watchForInterrupt() + initPersistence() + pingAssets() + watchForInterrupt() } func checkFlags() { @@ -90,6 +100,22 @@ func checkFlags() { } } +// Starts the tickers that periodically: +// - pull new user statuses into cache +// - push cached data to disk +func initPersistence() { + confObj.Mu.RLock() + cacheTkr := initTicker(false, confObj.CacheInterval) + dbTkr := initTicker(true, confObj.DBInterval) + confObj.Mu.RUnlock() + + go dataTimer(cacheTkr) + go dataTimer(dbTkr) + + dbTickC <- dbTkr + cTickC <- cacheTkr +} + // Watch for SIGINT aka ^C // Close the log file then exit func watchForInterrupt() { @@ -103,14 +129,8 @@ func watchForInterrupt() { confObj.Mu.RLock() log.Printf("Closing database connection to %v...\n", confObj.DBPath) - db := <-dbChan - - switch dbType := db.(type) { - case *dbLevel: - errLog("", dbType.db.Close()) - case *dbSqlite: - errLog("", dbType.db.Close()) - } + killTickers() + killDB() if !confObj.StdoutLogging { closeLog <- true |