summary refs log tree commit diff stats
diff options
context:
space:
mode:
authorBen Morrison <ben@gbmor.dev>2019-06-06 18:26:16 -0400
committerBen Morrison <ben@gbmor.dev>2019-06-08 02:29:28 -0400
commit78f4d8a6c1f8c41543dd5c1522ec1100a52a1283 (patch)
treee5db1bcf37e647aa8caec5902b32c6f9ea9dbff1
parent3eb5a0321ea6e70a7ad3234efc9a0c1263833389 (diff)
downloadgetwtxt-78f4d8a6c1f8c41543dd5c1522ec1100a52a1283.tar.gz
sqlite functionality added
-rw-r--r--svc/cache.go12
-rw-r--r--svc/conf.go10
-rw-r--r--svc/db.go149
-rw-r--r--svc/handlers.go7
-rw-r--r--svc/init.go7
-rw-r--r--svc/svc.go3
6 files changed, 127 insertions, 61 deletions
diff --git a/svc/cache.go b/svc/cache.go
index 04ed2cd..0963892 100644
--- a/svc/cache.go
+++ b/svc/cache.go
@@ -40,8 +40,7 @@ func cacheAndPush() {
 			refreshCache()
 		}
 		if dbTimer() {
-			err := pushDB()
-			errLog("Error pushing cache to database: ", err)
+			errLog("Error pushing cache to database: ", pushDB())
 		}
 		time.Sleep(1000 * time.Millisecond)
 	}
@@ -57,16 +56,14 @@ func refreshCache() {
 	twtxtCache.Mu.RLock()
 	for k := range twtxtCache.Users {
 		twtxtCache.Mu.RUnlock()
-		err := twtxtCache.UpdateUser(k)
-		errLog("", err)
+		errLog("", twtxtCache.UpdateUser(k))
 		twtxtCache.Mu.RLock()
 	}
 	twtxtCache.Mu.RUnlock()
 
 	remoteRegistries.Mu.RLock()
 	for _, v := range remoteRegistries.List {
-		err := twtxtCache.CrawlRemoteRegistry(v)
-		errLog("Error refreshing local copy of remote registry data: ", err)
+		errLog("Error refreshing local copy of remote registry data: ", twtxtCache.CrawlRemoteRegistry(v))
 	}
 	remoteRegistries.Mu.RUnlock()
 	confObj.Mu.Lock()
@@ -101,9 +98,8 @@ func pingAssets() {
 		buf := bytes.NewBuffer(b)
 
 		confObj.Mu.RLock()
-		err = tmpls.ExecuteTemplate(buf, "index.html", confObj.Instance)
+		errLog("", tmpls.ExecuteTemplate(buf, "index.html", confObj.Instance))
 		confObj.Mu.RUnlock()
-		errLog("", err)
 
 		staticCache.mu.Lock()
 		staticCache.index = buf.Bytes()
diff --git a/svc/conf.go b/svc/conf.go
index 24ff305..0aaee14 100644
--- a/svc/conf.go
+++ b/svc/conf.go
@@ -58,9 +58,7 @@ func initLogging() {
 	} else {
 
 		logfile, err := os.OpenFile(confObj.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
-		if err != nil {
-			log.Printf("Could not open log file: %v\n", err.Error())
-		}
+		errLog("Could not open log file: ", err)
 
 		// Listen for the signal to close the log file
 		// in a separate thread. Passing it as an argument
@@ -69,12 +67,10 @@ func initLogging() {
 		go func(logfile *os.File) {
 
 			<-closeLog
+
 			log.Printf("Closing log file ...\n")
+			errLog("Could not close log file: ", logfile.Close())
 
-			err = logfile.Close()
-			if err != nil {
-				log.Printf("Couldn't close log file: %v\n", err.Error())
-			}
 		}(logfile)
 
 		log.SetOutput(logfile)
diff --git a/svc/db.go b/svc/db.go
index 6a614f1..36c682b 100644
--- a/svc/db.go
+++ b/svc/db.go
@@ -21,47 +21,30 @@ type dbLevel struct {
 }
 
 type dbSqlite struct {
-	db *sql.DB
-}
-
-type dbPostgres struct {
-	db *sql.DB
+	db       *sql.DB
+	pullStmt *sql.Stmt
+	pushStmt *sql.Stmt
 }
 
 // Pull DB data into cache, if available.
 func initDatabase() {
 	var db dbase
-	var err error
 
 	confObj.Mu.RLock()
 	switch confObj.DBType {
 
 	case "leveldb":
-		var lvl *leveldb.DB
-		lvl, err = leveldb.OpenFile(confObj.DBPath, nil)
+		lvl, err := leveldb.OpenFile(confObj.DBPath, nil)
+		errFatal("", err)
 		db = &dbLevel{db: lvl}
 
 	case "sqlite":
-		var lite *sql.DB
-		lite, err := sql.Open("sqlite3", confObj.DBPath)
-		errFatal("Error opening sqlite3 DB: ", err)
-		litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT PRIMARY KEY, isUser BOOL, blobKey TEXT, data BLOB)")
-		errFatal("Error preparing sqlite3 DB: ", err)
-		_, err = litePrep.Exec()
-		errFatal("Error creating sqlite3 DB: ", err)
-		db = &dbSqlite{db: lite}
-
-	case "postgres":
-		var pg *sql.DB
-		db = &dbPostgres{db: pg}
+		db = initSqlite()
 
 	}
 	confObj.Mu.RUnlock()
 
-	errFatal("", err)
-
 	dbChan <- db
-
 	pullDB()
 }
 
@@ -73,6 +56,30 @@ func dbTimer() bool {
 	return answer
 }
 
+func initSqlite() *dbSqlite {
+
+	lite, err := sql.Open("sqlite3", confObj.DBPath)
+	errFatal("Error opening sqlite3 DB: ", err)
+
+	litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)")
+	errFatal("Error preparing sqlite3 DB: ", err)
+
+	_, err = litePrep.Exec()
+	errFatal("Error creating sqlite3 DB: ", err)
+
+	push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt(urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)")
+	errFatal("", err)
+
+	pull, err := lite.Prepare("SELECT * FROM getwtxt")
+	errFatal("", err)
+
+	return &dbSqlite{
+		db:       lite,
+		pushStmt: push,
+		pullStmt: pull,
+	}
+}
+
 // Pushes the registry's cache data to a local
 // database for safe keeping.
 func pushDB() error {
@@ -90,6 +97,7 @@ func pullDB() {
 }
 
 func (lvl dbLevel) push() error {
+
 	twtxtCache.Mu.RLock()
 	var dbBasket = &leveldb.Batch{}
 	for k, v := range twtxtCache.Users {
@@ -117,9 +125,7 @@ func (lvl dbLevel) push() error {
 	confObj.LastPush = time.Now()
 	confObj.Mu.Unlock()
 
-	err := lvl.db.Write(dbBasket, nil)
-
-	return err
+	return lvl.db.Write(dbBasket, nil)
 }
 
 func (lvl dbLevel) pull() {
@@ -144,10 +150,13 @@ func (lvl dbLevel) pull() {
 		data := registry.NewUser()
 		twtxtCache.Mu.RLock()
 		if _, ok := twtxtCache.Users[urls]; ok {
+			twtxtCache.Users[urls].Mu.RLock()
 			data = twtxtCache.Users[urls]
+			twtxtCache.Users[urls].Mu.RUnlock()
 		}
 		twtxtCache.Mu.RUnlock()
 
+		data.Mu.Lock()
 		switch field {
 		case "IP":
 			data.IP = net.ParseIP(val)
@@ -164,11 +173,11 @@ func (lvl dbLevel) pull() {
 			errLog("", err)
 			data.Status[thetime] = val
 		}
+		data.Mu.Unlock()
 
 		twtxtCache.Mu.Lock()
 		twtxtCache.Users[urls] = data
 		twtxtCache.Mu.Unlock()
-
 	}
 
 	remoteRegistries.Mu.Lock()
@@ -176,24 +185,96 @@ func (lvl dbLevel) pull() {
 	remoteRegistries.Mu.Unlock()
 
 	iter.Release()
-	err := iter.Error()
-	errLog("Error while pulling DB into registry cache: ", err)
+	errLog("Error while pulling DB into registry cache: ", iter.Error())
 }
 
 func (lite dbSqlite) push() error {
+	err := lite.db.Ping()
+	if err != nil {
+		return err
+	}
+
+	twtxtCache.Mu.RLock()
+	for i, e := range twtxtCache.Users {
+		e.Mu.RLock()
+
+		_, err = lite.pushStmt.Exec(i, true, "nickname", e.Nick)
+		errLog("", err)
+		_, err = lite.pushStmt.Exec(i, true, "rlen", e.RLen)
+		errLog("", err)
+		_, err = lite.pushStmt.Exec(i, true, "uip", e.IP)
+		errLog("", err)
+		_, err = lite.pushStmt.Exec(i, true, "date", e.Date)
+		errLog("", err)
+
+		for k, v := range e.Status {
+			_, err = lite.pushStmt.Exec(i, true, k.Format(time.RFC3339), v)
+			errLog("", err)
+		}
+
+		e.Mu.RUnlock()
+	}
+	twtxtCache.Mu.RUnlock()
+
+	remoteRegistries.Mu.RLock()
+	for _, e := range remoteRegistries.List {
+		_, err = lite.pushStmt.Exec(e, false, "REMOTE REGISTRY", "NULL")
+		errLog("", err)
+	}
+	remoteRegistries.Mu.RUnlock()
 
 	return nil
 }
 
 func (lite dbSqlite) pull() {
+	errLog("Error pinging sqlite DB: ", lite.db.Ping())
 
-}
+	rows, err := lite.pullStmt.Query()
+	errLog("", err)
 
-func (pg dbPostgres) push() error {
+	twtxtCache.Mu.Lock()
+	for rows.Next() {
+		var urls string
+		var isUser bool
+		var dataKey string
+		var dBlob []byte
 
-	return nil
-}
+		errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob))
+
+		if !isUser {
+			remoteRegistries.Mu.Lock()
+			remoteRegistries.List = append(remoteRegistries.List, urls)
+			remoteRegistries.Mu.Unlock()
+			continue
+		}
+
+		user := registry.NewUser()
+		if _, ok := twtxtCache.Users[urls]; ok {
+			user = twtxtCache.Users[urls]
+		}
+		user.Mu.Lock()
+
+		switch dataKey {
+		case "nickname":
+			user.Nick = string(dBlob)
+		case "uip":
+			user.IP = net.ParseIP(string(dBlob))
+		case "date":
+			user.Date = string(dBlob)
+		case "rlen":
+			user.RLen = string(dBlob)
+		default:
+			thetime, err := time.Parse(time.RFC3339, dataKey)
+			errLog("While pulling statuses from SQLite: ", err)
+			user.Status[thetime] = string(dBlob)
+		}
 
-func (pg dbPostgres) pull() {
+		twtxtCache.Users[urls] = user
+		user.Mu.Unlock()
+	}
+	twtxtCache.Mu.Unlock()
 
+	remoteRegistries.Mu.Lock()
+	remoteRegistries.List = dedupe(remoteRegistries.List)
+	remoteRegistries.Mu.Unlock()
 }
diff --git a/svc/handlers.go b/svc/handlers.go
index 13555d5..5bb0d4f 100644
--- a/svc/handlers.go
+++ b/svc/handlers.go
@@ -79,9 +79,6 @@ func apiAllTweetsHandler(w http.ResponseWriter, r *http.Request) {
 	}
 
 	data := parseQueryOut(out)
-	if err != nil {
-		data = []byte("")
-	}
 
 	etag := fmt.Sprintf("%x", sha256.Sum256(data))
 	w.Header().Set("ETag", etag)
@@ -140,11 +137,9 @@ func apiEndpointHandler(w http.ResponseWriter, r *http.Request) {
 		out, err = twtxtCache.QueryAllStatuses()
 		out = registry.ReduceToPage(page, out)
 	}
+	errLog("", err)
 
 	data := parseQueryOut(out)
-	if err != nil {
-		data = []byte("")
-	}
 
 	etag := fmt.Sprintf("%x", sha256.Sum256(data))
 	w.Header().Set("ETag", etag)
diff --git a/svc/init.go b/svc/init.go
index 8e9bb20..7db5303 100644
--- a/svc/init.go
+++ b/svc/init.go
@@ -101,11 +101,10 @@ func watchForInterrupt() {
 			db := <-dbChan
 
 			switch dbType := db.(type) {
-
 			case *dbLevel:
-				lvl := dbType
-				err := lvl.db.Close()
-				errLog("", err)
+				errLog("", dbType.db.Close())
+			case *dbSqlite:
+				errLog("", dbType.db.Close())
 			}
 
 			if !confObj.StdoutLogging {
diff --git a/svc/svc.go b/svc/svc.go
index 717a4d6..3130c56 100644
--- a/svc/svc.go
+++ b/svc/svc.go
@@ -112,8 +112,7 @@ func Start() {
 	}
 
 	log.Printf("Listening on %v\n", portnum)
-	err := server.ListenAndServe()
-	errLog("", err)
+	errLog("", server.ListenAndServe())
 
 	closeLog <- true
 }