From 78f4d8a6c1f8c41543dd5c1522ec1100a52a1283 Mon Sep 17 00:00:00 2001 From: Ben Morrison Date: Thu, 6 Jun 2019 18:26:16 -0400 Subject: sqlite functionality added --- svc/cache.go | 12 ++--- svc/conf.go | 10 ++-- svc/db.go | 149 +++++++++++++++++++++++++++++++++++++++++++------------- svc/handlers.go | 7 +-- svc/init.go | 7 ++- svc/svc.go | 3 +- 6 files changed, 127 insertions(+), 61 deletions(-) (limited to 'svc') diff --git a/svc/cache.go b/svc/cache.go index 04ed2cd..0963892 100644 --- a/svc/cache.go +++ b/svc/cache.go @@ -40,8 +40,7 @@ func cacheAndPush() { refreshCache() } if dbTimer() { - err := pushDB() - errLog("Error pushing cache to database: ", err) + errLog("Error pushing cache to database: ", pushDB()) } time.Sleep(1000 * time.Millisecond) } @@ -57,16 +56,14 @@ func refreshCache() { twtxtCache.Mu.RLock() for k := range twtxtCache.Users { twtxtCache.Mu.RUnlock() - err := twtxtCache.UpdateUser(k) - errLog("", err) + errLog("", twtxtCache.UpdateUser(k)) twtxtCache.Mu.RLock() } twtxtCache.Mu.RUnlock() remoteRegistries.Mu.RLock() for _, v := range remoteRegistries.List { - err := twtxtCache.CrawlRemoteRegistry(v) - errLog("Error refreshing local copy of remote registry data: ", err) + errLog("Error refreshing local copy of remote registry data: ", twtxtCache.CrawlRemoteRegistry(v)) } remoteRegistries.Mu.RUnlock() confObj.Mu.Lock() @@ -101,9 +98,8 @@ func pingAssets() { buf := bytes.NewBuffer(b) confObj.Mu.RLock() - err = tmpls.ExecuteTemplate(buf, "index.html", confObj.Instance) + errLog("", tmpls.ExecuteTemplate(buf, "index.html", confObj.Instance)) confObj.Mu.RUnlock() - errLog("", err) staticCache.mu.Lock() staticCache.index = buf.Bytes() diff --git a/svc/conf.go b/svc/conf.go index 24ff305..0aaee14 100644 --- a/svc/conf.go +++ b/svc/conf.go @@ -58,9 +58,7 @@ func initLogging() { } else { logfile, err := os.OpenFile(confObj.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) - if err != nil { - log.Printf("Could not open log file: %v\n", err.Error()) - } + errLog("Could not open log file: ", err) // Listen for the signal to close the log file // in a separate thread. Passing it as an argument @@ -69,12 +67,10 @@ func initLogging() { go func(logfile *os.File) { <-closeLog + log.Printf("Closing log file ...\n") + errLog("Could not close log file: ", logfile.Close()) - err = logfile.Close() - if err != nil { - log.Printf("Couldn't close log file: %v\n", err.Error()) - } }(logfile) log.SetOutput(logfile) diff --git a/svc/db.go b/svc/db.go index 6a614f1..36c682b 100644 --- a/svc/db.go +++ b/svc/db.go @@ -21,47 +21,30 @@ type dbLevel struct { } type dbSqlite struct { - db *sql.DB -} - -type dbPostgres struct { - db *sql.DB + db *sql.DB + pullStmt *sql.Stmt + pushStmt *sql.Stmt } // Pull DB data into cache, if available. func initDatabase() { var db dbase - var err error confObj.Mu.RLock() switch confObj.DBType { case "leveldb": - var lvl *leveldb.DB - lvl, err = leveldb.OpenFile(confObj.DBPath, nil) + lvl, err := leveldb.OpenFile(confObj.DBPath, nil) + errFatal("", err) db = &dbLevel{db: lvl} case "sqlite": - var lite *sql.DB - lite, err := sql.Open("sqlite3", confObj.DBPath) - errFatal("Error opening sqlite3 DB: ", err) - litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT PRIMARY KEY, isUser BOOL, blobKey TEXT, data BLOB)") - errFatal("Error preparing sqlite3 DB: ", err) - _, err = litePrep.Exec() - errFatal("Error creating sqlite3 DB: ", err) - db = &dbSqlite{db: lite} - - case "postgres": - var pg *sql.DB - db = &dbPostgres{db: pg} + db = initSqlite() } confObj.Mu.RUnlock() - errFatal("", err) - dbChan <- db - pullDB() } @@ -73,6 +56,30 @@ func dbTimer() bool { return answer } +func initSqlite() *dbSqlite { + + lite, err := sql.Open("sqlite3", confObj.DBPath) + errFatal("Error opening sqlite3 DB: ", err) + + litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)") + errFatal("Error preparing sqlite3 DB: ", err) + + _, err = litePrep.Exec() + errFatal("Error creating sqlite3 DB: ", err) + + push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt(urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)") + errFatal("", err) + + pull, err := lite.Prepare("SELECT * FROM getwtxt") + errFatal("", err) + + return &dbSqlite{ + db: lite, + pushStmt: push, + pullStmt: pull, + } +} + // Pushes the registry's cache data to a local // database for safe keeping. func pushDB() error { @@ -90,6 +97,7 @@ func pullDB() { } func (lvl dbLevel) push() error { + twtxtCache.Mu.RLock() var dbBasket = &leveldb.Batch{} for k, v := range twtxtCache.Users { @@ -117,9 +125,7 @@ func (lvl dbLevel) push() error { confObj.LastPush = time.Now() confObj.Mu.Unlock() - err := lvl.db.Write(dbBasket, nil) - - return err + return lvl.db.Write(dbBasket, nil) } func (lvl dbLevel) pull() { @@ -144,10 +150,13 @@ func (lvl dbLevel) pull() { data := registry.NewUser() twtxtCache.Mu.RLock() if _, ok := twtxtCache.Users[urls]; ok { + twtxtCache.Users[urls].Mu.RLock() data = twtxtCache.Users[urls] + twtxtCache.Users[urls].Mu.RUnlock() } twtxtCache.Mu.RUnlock() + data.Mu.Lock() switch field { case "IP": data.IP = net.ParseIP(val) @@ -164,11 +173,11 @@ func (lvl dbLevel) pull() { errLog("", err) data.Status[thetime] = val } + data.Mu.Unlock() twtxtCache.Mu.Lock() twtxtCache.Users[urls] = data twtxtCache.Mu.Unlock() - } remoteRegistries.Mu.Lock() @@ -176,24 +185,96 @@ func (lvl dbLevel) pull() { remoteRegistries.Mu.Unlock() iter.Release() - err := iter.Error() - errLog("Error while pulling DB into registry cache: ", err) + errLog("Error while pulling DB into registry cache: ", iter.Error()) } func (lite dbSqlite) push() error { + err := lite.db.Ping() + if err != nil { + return err + } + + twtxtCache.Mu.RLock() + for i, e := range twtxtCache.Users { + e.Mu.RLock() + + _, err = lite.pushStmt.Exec(i, true, "nickname", e.Nick) + errLog("", err) + _, err = lite.pushStmt.Exec(i, true, "rlen", e.RLen) + errLog("", err) + _, err = lite.pushStmt.Exec(i, true, "uip", e.IP) + errLog("", err) + _, err = lite.pushStmt.Exec(i, true, "date", e.Date) + errLog("", err) + + for k, v := range e.Status { + _, err = lite.pushStmt.Exec(i, true, k.Format(time.RFC3339), v) + errLog("", err) + } + + e.Mu.RUnlock() + } + twtxtCache.Mu.RUnlock() + + remoteRegistries.Mu.RLock() + for _, e := range remoteRegistries.List { + _, err = lite.pushStmt.Exec(e, false, "REMOTE REGISTRY", "NULL") + errLog("", err) + } + remoteRegistries.Mu.RUnlock() return nil } func (lite dbSqlite) pull() { + errLog("Error pinging sqlite DB: ", lite.db.Ping()) -} + rows, err := lite.pullStmt.Query() + errLog("", err) -func (pg dbPostgres) push() error { + twtxtCache.Mu.Lock() + for rows.Next() { + var urls string + var isUser bool + var dataKey string + var dBlob []byte - return nil -} + errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob)) + + if !isUser { + remoteRegistries.Mu.Lock() + remoteRegistries.List = append(remoteRegistries.List, urls) + remoteRegistries.Mu.Unlock() + continue + } + + user := registry.NewUser() + if _, ok := twtxtCache.Users[urls]; ok { + user = twtxtCache.Users[urls] + } + user.Mu.Lock() + + switch dataKey { + case "nickname": + user.Nick = string(dBlob) + case "uip": + user.IP = net.ParseIP(string(dBlob)) + case "date": + user.Date = string(dBlob) + case "rlen": + user.RLen = string(dBlob) + default: + thetime, err := time.Parse(time.RFC3339, dataKey) + errLog("While pulling statuses from SQLite: ", err) + user.Status[thetime] = string(dBlob) + } -func (pg dbPostgres) pull() { + twtxtCache.Users[urls] = user + user.Mu.Unlock() + } + twtxtCache.Mu.Unlock() + remoteRegistries.Mu.Lock() + remoteRegistries.List = dedupe(remoteRegistries.List) + remoteRegistries.Mu.Unlock() } diff --git a/svc/handlers.go b/svc/handlers.go index 13555d5..5bb0d4f 100644 --- a/svc/handlers.go +++ b/svc/handlers.go @@ -79,9 +79,6 @@ func apiAllTweetsHandler(w http.ResponseWriter, r *http.Request) { } data := parseQueryOut(out) - if err != nil { - data = []byte("") - } etag := fmt.Sprintf("%x", sha256.Sum256(data)) w.Header().Set("ETag", etag) @@ -140,11 +137,9 @@ func apiEndpointHandler(w http.ResponseWriter, r *http.Request) { out, err = twtxtCache.QueryAllStatuses() out = registry.ReduceToPage(page, out) } + errLog("", err) data := parseQueryOut(out) - if err != nil { - data = []byte("") - } etag := fmt.Sprintf("%x", sha256.Sum256(data)) w.Header().Set("ETag", etag) diff --git a/svc/init.go b/svc/init.go index 8e9bb20..7db5303 100644 --- a/svc/init.go +++ b/svc/init.go @@ -101,11 +101,10 @@ func watchForInterrupt() { db := <-dbChan switch dbType := db.(type) { - case *dbLevel: - lvl := dbType - err := lvl.db.Close() - errLog("", err) + errLog("", dbType.db.Close()) + case *dbSqlite: + errLog("", dbType.db.Close()) } if !confObj.StdoutLogging { diff --git a/svc/svc.go b/svc/svc.go index 717a4d6..3130c56 100644 --- a/svc/svc.go +++ b/svc/svc.go @@ -112,8 +112,7 @@ func Start() { } log.Printf("Listening on %v\n", portnum) - err := server.ListenAndServe() - errLog("", err) + errLog("", server.ListenAndServe()) closeLog <- true } -- cgit 1.4.1-2-gfad0