summary refs log tree commit diff stats
path: root/svc
diff options
context:
space:
mode:
Diffstat (limited to 'svc')
-rw-r--r--svc/cache.go12
-rw-r--r--svc/conf.go10
-rw-r--r--svc/db.go149
-rw-r--r--svc/handlers.go7
-rw-r--r--svc/init.go7
-rw-r--r--svc/svc.go3
6 files changed, 127 insertions, 61 deletions
diff --git a/svc/cache.go b/svc/cache.go
index 04ed2cd..0963892 100644
--- a/svc/cache.go
+++ b/svc/cache.go
@@ -40,8 +40,7 @@ func cacheAndPush() {
 			refreshCache()
 		}
 		if dbTimer() {
-			err := pushDB()
-			errLog("Error pushing cache to database: ", err)
+			errLog("Error pushing cache to database: ", pushDB())
 		}
 		time.Sleep(1000 * time.Millisecond)
 	}
@@ -57,16 +56,14 @@ func refreshCache() {
 	twtxtCache.Mu.RLock()
 	for k := range twtxtCache.Users {
 		twtxtCache.Mu.RUnlock()
-		err := twtxtCache.UpdateUser(k)
-		errLog("", err)
+		errLog("", twtxtCache.UpdateUser(k))
 		twtxtCache.Mu.RLock()
 	}
 	twtxtCache.Mu.RUnlock()
 
 	remoteRegistries.Mu.RLock()
 	for _, v := range remoteRegistries.List {
-		err := twtxtCache.CrawlRemoteRegistry(v)
-		errLog("Error refreshing local copy of remote registry data: ", err)
+		errLog("Error refreshing local copy of remote registry data: ", twtxtCache.CrawlRemoteRegistry(v))
 	}
 	remoteRegistries.Mu.RUnlock()
 	confObj.Mu.Lock()
@@ -101,9 +98,8 @@ func pingAssets() {
 		buf := bytes.NewBuffer(b)
 
 		confObj.Mu.RLock()
-		err = tmpls.ExecuteTemplate(buf, "index.html", confObj.Instance)
+		errLog("", tmpls.ExecuteTemplate(buf, "index.html", confObj.Instance))
 		confObj.Mu.RUnlock()
-		errLog("", err)
 
 		staticCache.mu.Lock()
 		staticCache.index = buf.Bytes()
diff --git a/svc/conf.go b/svc/conf.go
index 24ff305..0aaee14 100644
--- a/svc/conf.go
+++ b/svc/conf.go
@@ -58,9 +58,7 @@ func initLogging() {
 	} else {
 
 		logfile, err := os.OpenFile(confObj.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
-		if err != nil {
-			log.Printf("Could not open log file: %v\n", err.Error())
-		}
+		errLog("Could not open log file: ", err)
 
 		// Listen for the signal to close the log file
 		// in a separate thread. Passing it as an argument
@@ -69,12 +67,10 @@ func initLogging() {
 		go func(logfile *os.File) {
 
 			<-closeLog
+
 			log.Printf("Closing log file ...\n")
+			errLog("Could not close log file: ", logfile.Close())
 
-			err = logfile.Close()
-			if err != nil {
-				log.Printf("Couldn't close log file: %v\n", err.Error())
-			}
 		}(logfile)
 
 		log.SetOutput(logfile)
diff --git a/svc/db.go b/svc/db.go
index 6a614f1..36c682b 100644
--- a/svc/db.go
+++ b/svc/db.go
@@ -21,47 +21,30 @@ type dbLevel struct {
 }
 
 type dbSqlite struct {
-	db *sql.DB
-}
-
-type dbPostgres struct {
-	db *sql.DB
+	db       *sql.DB
+	pullStmt *sql.Stmt
+	pushStmt *sql.Stmt
 }
 
 // Pull DB data into cache, if available.
 func initDatabase() {
 	var db dbase
-	var err error
 
 	confObj.Mu.RLock()
 	switch confObj.DBType {
 
 	case "leveldb":
-		var lvl *leveldb.DB
-		lvl, err = leveldb.OpenFile(confObj.DBPath, nil)
+		lvl, err := leveldb.OpenFile(confObj.DBPath, nil)
+		errFatal("", err)
 		db = &dbLevel{db: lvl}
 
 	case "sqlite":
-		var lite *sql.DB
-		lite, err := sql.Open("sqlite3", confObj.DBPath)
-		errFatal("Error opening sqlite3 DB: ", err)
-		litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT PRIMARY KEY, isUser BOOL, blobKey TEXT, data BLOB)")
-		errFatal("Error preparing sqlite3 DB: ", err)
-		_, err = litePrep.Exec()
-		errFatal("Error creating sqlite3 DB: ", err)
-		db = &dbSqlite{db: lite}
-
-	case "postgres":
-		var pg *sql.DB
-		db = &dbPostgres{db: pg}
+		db = initSqlite()
 
 	}
 	confObj.Mu.RUnlock()
 
-	errFatal("", err)
-
 	dbChan <- db
-
 	pullDB()
 }
 
@@ -73,6 +56,30 @@ func dbTimer() bool {
 	return answer
 }
 
+func initSqlite() *dbSqlite {
+
+	lite, err := sql.Open("sqlite3", confObj.DBPath)
+	errFatal("Error opening sqlite3 DB: ", err)
+
+	litePrep, err := lite.Prepare("CREATE TABLE IF NOT EXISTS getwtxt (urlKey TEXT, isUser BOOL, dataKey TEXT, data BLOB)")
+	errFatal("Error preparing sqlite3 DB: ", err)
+
+	_, err = litePrep.Exec()
+	errFatal("Error creating sqlite3 DB: ", err)
+
+	push, err := lite.Prepare("INSERT OR REPLACE INTO getwtxt(urlKey, isUser, dataKey, data) VALUES(?, ?, ?, ?)")
+	errFatal("", err)
+
+	pull, err := lite.Prepare("SELECT * FROM getwtxt")
+	errFatal("", err)
+
+	return &dbSqlite{
+		db:       lite,
+		pushStmt: push,
+		pullStmt: pull,
+	}
+}
+
 // Pushes the registry's cache data to a local
 // database for safe keeping.
 func pushDB() error {
@@ -90,6 +97,7 @@ func pullDB() {
 }
 
 func (lvl dbLevel) push() error {
+
 	twtxtCache.Mu.RLock()
 	var dbBasket = &leveldb.Batch{}
 	for k, v := range twtxtCache.Users {
@@ -117,9 +125,7 @@ func (lvl dbLevel) push() error {
 	confObj.LastPush = time.Now()
 	confObj.Mu.Unlock()
 
-	err := lvl.db.Write(dbBasket, nil)
-
-	return err
+	return lvl.db.Write(dbBasket, nil)
 }
 
 func (lvl dbLevel) pull() {
@@ -144,10 +150,13 @@ func (lvl dbLevel) pull() {
 		data := registry.NewUser()
 		twtxtCache.Mu.RLock()
 		if _, ok := twtxtCache.Users[urls]; ok {
+			twtxtCache.Users[urls].Mu.RLock()
 			data = twtxtCache.Users[urls]
+			twtxtCache.Users[urls].Mu.RUnlock()
 		}
 		twtxtCache.Mu.RUnlock()
 
+		data.Mu.Lock()
 		switch field {
 		case "IP":
 			data.IP = net.ParseIP(val)
@@ -164,11 +173,11 @@ func (lvl dbLevel) pull() {
 			errLog("", err)
 			data.Status[thetime] = val
 		}
+		data.Mu.Unlock()
 
 		twtxtCache.Mu.Lock()
 		twtxtCache.Users[urls] = data
 		twtxtCache.Mu.Unlock()
-
 	}
 
 	remoteRegistries.Mu.Lock()
@@ -176,24 +185,96 @@ func (lvl dbLevel) pull() {
 	remoteRegistries.Mu.Unlock()
 
 	iter.Release()
-	err := iter.Error()
-	errLog("Error while pulling DB into registry cache: ", err)
+	errLog("Error while pulling DB into registry cache: ", iter.Error())
 }
 
 func (lite dbSqlite) push() error {
+	err := lite.db.Ping()
+	if err != nil {
+		return err
+	}
+
+	twtxtCache.Mu.RLock()
+	for i, e := range twtxtCache.Users {
+		e.Mu.RLock()
+
+		_, err = lite.pushStmt.Exec(i, true, "nickname", e.Nick)
+		errLog("", err)
+		_, err = lite.pushStmt.Exec(i, true, "rlen", e.RLen)
+		errLog("", err)
+		_, err = lite.pushStmt.Exec(i, true, "uip", e.IP)
+		errLog("", err)
+		_, err = lite.pushStmt.Exec(i, true, "date", e.Date)
+		errLog("", err)
+
+		for k, v := range e.Status {
+			_, err = lite.pushStmt.Exec(i, true, k.Format(time.RFC3339), v)
+			errLog("", err)
+		}
+
+		e.Mu.RUnlock()
+	}
+	twtxtCache.Mu.RUnlock()
+
+	remoteRegistries.Mu.RLock()
+	for _, e := range remoteRegistries.List {
+		_, err = lite.pushStmt.Exec(e, false, "REMOTE REGISTRY", "NULL")
+		errLog("", err)
+	}
+	remoteRegistries.Mu.RUnlock()
 
 	return nil
 }
 
 func (lite dbSqlite) pull() {
+	errLog("Error pinging sqlite DB: ", lite.db.Ping())
 
-}
+	rows, err := lite.pullStmt.Query()
+	errLog("", err)
 
-func (pg dbPostgres) push() error {
+	twtxtCache.Mu.Lock()
+	for rows.Next() {
+		var urls string
+		var isUser bool
+		var dataKey string
+		var dBlob []byte
 
-	return nil
-}
+		errLog("", rows.Scan(&urls, &isUser, &dataKey, &dBlob))
+
+		if !isUser {
+			remoteRegistries.Mu.Lock()
+			remoteRegistries.List = append(remoteRegistries.List, urls)
+			remoteRegistries.Mu.Unlock()
+			continue
+		}
+
+		user := registry.NewUser()
+		if _, ok := twtxtCache.Users[urls]; ok {
+			user = twtxtCache.Users[urls]
+		}
+		user.Mu.Lock()
+
+		switch dataKey {
+		case "nickname":
+			user.Nick = string(dBlob)
+		case "uip":
+			user.IP = net.ParseIP(string(dBlob))
+		case "date":
+			user.Date = string(dBlob)
+		case "rlen":
+			user.RLen = string(dBlob)
+		default:
+			thetime, err := time.Parse(time.RFC3339, dataKey)
+			errLog("While pulling statuses from SQLite: ", err)
+			user.Status[thetime] = string(dBlob)
+		}
 
-func (pg dbPostgres) pull() {
+		twtxtCache.Users[urls] = user
+		user.Mu.Unlock()
+	}
+	twtxtCache.Mu.Unlock()
 
+	remoteRegistries.Mu.Lock()
+	remoteRegistries.List = dedupe(remoteRegistries.List)
+	remoteRegistries.Mu.Unlock()
 }
diff --git a/svc/handlers.go b/svc/handlers.go
index 13555d5..5bb0d4f 100644
--- a/svc/handlers.go
+++ b/svc/handlers.go
@@ -79,9 +79,6 @@ func apiAllTweetsHandler(w http.ResponseWriter, r *http.Request) {
 	}
 
 	data := parseQueryOut(out)
-	if err != nil {
-		data = []byte("")
-	}
 
 	etag := fmt.Sprintf("%x", sha256.Sum256(data))
 	w.Header().Set("ETag", etag)
@@ -140,11 +137,9 @@ func apiEndpointHandler(w http.ResponseWriter, r *http.Request) {
 		out, err = twtxtCache.QueryAllStatuses()
 		out = registry.ReduceToPage(page, out)
 	}
+	errLog("", err)
 
 	data := parseQueryOut(out)
-	if err != nil {
-		data = []byte("")
-	}
 
 	etag := fmt.Sprintf("%x", sha256.Sum256(data))
 	w.Header().Set("ETag", etag)
diff --git a/svc/init.go b/svc/init.go
index 8e9bb20..7db5303 100644
--- a/svc/init.go
+++ b/svc/init.go
@@ -101,11 +101,10 @@ func watchForInterrupt() {
 			db := <-dbChan
 
 			switch dbType := db.(type) {
-
 			case *dbLevel:
-				lvl := dbType
-				err := lvl.db.Close()
-				errLog("", err)
+				errLog("", dbType.db.Close())
+			case *dbSqlite:
+				errLog("", dbType.db.Close())
 			}
 
 			if !confObj.StdoutLogging {
diff --git a/svc/svc.go b/svc/svc.go
index 717a4d6..3130c56 100644
--- a/svc/svc.go
+++ b/svc/svc.go
@@ -112,8 +112,7 @@ func Start() {
 	}
 
 	log.Printf("Listening on %v\n", portnum)
-	err := server.ListenAndServe()
-	errLog("", err)
+	errLog("", server.ListenAndServe())
 
 	closeLog <- true
 }
00 committer Kartik K. Agaram <vc@akkartik.com> 2015-05-07 15:29:13 -0700 1298 - better ingredient/product handling' href='/akkartik/mu/commit/036call_ingredient.cc?h=main&id=0487a30e7078861ed7de42bdb21b5c71fb9b54a1'>0487a30e ^
67573caf ^
ac0e9db5 ^
0487a30e ^
67573caf ^
69f04c3f ^
4be9a93b ^
3e849f11 ^
9dcbec39 ^
75aa3a98 ^
0487a30e ^
75aa3a98 ^
2d29369f ^
0487a30e ^
4be9a93b ^
69f04c3f ^

717ab659 ^
4a943d4e ^














3e849f11 ^
4a943d4e ^


















717ab659 ^



795f5244 ^
4a48bedc ^
166e3c0d ^



717ab659 ^

67573caf ^
717ab659 ^


23d3a022 ^

4a943d4e ^















717ab659 ^



795f5244 ^
4a48bedc ^
166e3c0d ^
717ab659 ^
166e3c0d ^
2b250717 ^
e4630643 ^

5f98a10c ^
9dcbec39 ^
e4630643 ^

166e3c0d ^



b24eb476 ^
67573caf ^
0487a30e ^
67573caf ^
ac0e9db5 ^
827898fc ^
67573caf ^
717ab659 ^

ac0e9db5 ^
0487a30e ^

827898fc ^
717ab659 ^



c9a5a7ba ^
78c50205 ^
c9a5a7ba ^
78c50205 ^

c9a5a7ba ^
af023b32 ^

1fb0cf9e ^
af023b32 ^


1fb0cf9e ^
af023b32 ^
93d4cc93 ^
1fb0cf9e ^
93d4cc93 ^
c9a5a7ba ^
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220