summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--worker/notmuch/message.go91
-rw-r--r--worker/notmuch/worker.go86
2 files changed, 147 insertions, 30 deletions
diff --git a/worker/notmuch/message.go b/worker/notmuch/message.go
index 077fb92..aa16cee 100644
--- a/worker/notmuch/message.go
+++ b/worker/notmuch/message.go
@@ -17,13 +17,15 @@ import (
 )
 
 type Message struct {
-	uid uint32
-	key string
-	msg *notmuch.Message
+	uid     uint32
+	key     string
+	msg     *notmuch.Message
+	rwDB    func() (*notmuch.DB, error) // used to open a db for writing
+	refresh func(*Message) error        // called after msg modification
 }
 
 // NewReader reads a message into memory and returns an io.Reader for it.
-func (m Message) NewReader() (io.Reader, error) {
+func (m *Message) NewReader() (io.Reader, error) {
 	f, err := os.Open(m.msg.Filename())
 	if err != nil {
 		return nil, err
@@ -37,13 +39,13 @@ func (m Message) NewReader() (io.Reader, error) {
 }
 
 // MessageInfo populates a models.MessageInfo struct for the message.
-func (m Message) MessageInfo() (*models.MessageInfo, error) {
+func (m *Message) MessageInfo() (*models.MessageInfo, error) {
 	return lib.MessageInfo(m)
 }
 
 // NewBodyPartReader creates a new io.Reader for the requested body part(s) of
 // the message.
-func (m Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) {
+func (m *Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) {
 	f, err := os.Open(m.msg.Filename())
 	if err != nil {
 		return nil, err
@@ -57,7 +59,7 @@ func (m Message) NewBodyPartReader(requestedParts []int) (io.Reader, error) {
 }
 
 // MarkRead either adds or removes the maildir.FlagSeen flag from the message.
-func (m Message) MarkRead(seen bool) error {
+func (m *Message) MarkRead(seen bool) error {
 	haveUnread := false
 	for _, t := range m.tags() {
 		if t == "unread" {
@@ -71,14 +73,14 @@ func (m Message) MarkRead(seen bool) error {
 	}
 
 	if haveUnread {
-		err := m.msg.RemoveTag("unread")
+		err := m.RemoveTag("unread")
 		if err != nil {
 			return err
 		}
 		return nil
 	}
 
-	err := m.msg.AddTag("unread")
+	err := m.AddTag("unread")
 	if err != nil {
 		return err
 	}
@@ -86,7 +88,7 @@ func (m Message) MarkRead(seen bool) error {
 }
 
 // tags returns the notmuch tags of a message
-func (m Message) tags() []string {
+func (m *Message) tags() []string {
 	ts := m.msg.Tags()
 	var tags []string
 	var tag *notmuch.Tag
@@ -96,7 +98,72 @@ func (m Message) tags() []string {
 	return tags
 }
 
-func (m Message) ModelFlags() ([]models.Flag, error) {
+func (m *Message) modify(cb func(*notmuch.Message) error) error {
+	db, err := m.rwDB()
+	if err != nil {
+		return err
+	}
+	defer db.Close()
+	msg, err := db.FindMessage(m.key)
+	if err != nil {
+		return err
+	}
+	err = cb(msg)
+	if err != nil {
+		return err
+	}
+	// we need to explicitly close here, else we don't commit
+	dcerr := db.Close()
+	if dcerr != nil && err == nil {
+		err = dcerr
+	}
+	// next we need to refresh the notmuch msg, else we serve stale tags
+	rerr := m.refresh(m)
+	if rerr != nil && err == nil {
+		err = rerr
+	}
+	return err
+}
+
+func (m *Message) AddTag(tag string) error {
+	err := m.modify(func(msg *notmuch.Message) error {
+		return msg.AddTag(tag)
+	})
+	return err
+}
+
+func (m *Message) AddTags(tags []string) error {
+	err := m.modify(func(msg *notmuch.Message) error {
+		ierr := msg.Atomic(func(msg *notmuch.Message) {
+			for _, t := range tags {
+				msg.AddTag(t)
+			}
+		})
+		return ierr
+	})
+	return err
+}
+
+func (m *Message) RemoveTag(tag string) error {
+	err := m.modify(func(msg *notmuch.Message) error {
+		return msg.RemoveTag(tag)
+	})
+	return err
+}
+
+func (m *Message) RemoveTags(tags []string) error {
+	err := m.modify(func(msg *notmuch.Message) error {
+		ierr := msg.Atomic(func(msg *notmuch.Message) {
+			for _, t := range tags {
+				msg.RemoveTag(t)
+			}
+		})
+		return ierr
+	})
+	return err
+}
+
+func (m *Message) ModelFlags() ([]models.Flag, error) {
 	var flags []models.Flag
 	seen := true
 
@@ -118,6 +185,6 @@ func (m Message) ModelFlags() ([]models.Flag, error) {
 	return flags, nil
 }
 
-func (m Message) UID() uint32 {
+func (m *Message) UID() uint32 {
 	return m.uid
 }
diff --git a/worker/notmuch/worker.go b/worker/notmuch/worker.go
index 6187b24..d4c196c 100644
--- a/worker/notmuch/worker.go
+++ b/worker/notmuch/worker.go
@@ -29,7 +29,7 @@ type worker struct {
 	w            *types.Worker
 	pathToDB     string
 	db           *notmuch.DB
-	selected     *notmuch.Query
+	query        string
 	uidStore     *uidstore.Store
 	excludedTags []string
 	nameQueryMap map[string]string
@@ -129,12 +129,36 @@ func (w *worker) handleConfigure(msg *types.Configure) error {
 	return nil
 }
 
-func (w *worker) handleConnect(msg *types.Connect) error {
+// connectRW returns a writable notmuch DB, which needs to be closed to commit
+// the changes and to release the DB lock
+func (w *worker) connectRW() (*notmuch.DB, error) {
+	db, err := notmuch.Open(w.pathToDB, notmuch.DBReadWrite)
+	if err != nil {
+		return nil, fmt.Errorf("could not connect to notmuch db: %v", err)
+	}
+	return db, err
+}
+
+// connectRO connects a RO db to the worker
+func (w *worker) connectRO() error {
+	if w.db != nil {
+		if err := w.db.Close(); err != nil {
+			w.w.Logger.Printf("connectRO: could not close the old db: %v", err)
+		}
+	}
 	var err error
-	w.db, err = notmuch.Open(w.pathToDB, notmuch.DBReadWrite)
+	w.db, err = notmuch.Open(w.pathToDB, notmuch.DBReadOnly)
 	if err != nil {
 		return fmt.Errorf("could not connect to notmuch db: %v", err)
 	}
+	return nil
+}
+
+func (w *worker) handleConnect(msg *types.Connect) error {
+	err := w.connectRO()
+	if err != nil {
+		return err
+	}
 	w.done(msg)
 	return nil
 }
@@ -153,21 +177,32 @@ func (w *worker) handleListDirectories(msg *types.ListDirectories) error {
 	return nil
 }
 
+//query returns a query based on the query string on w.query.
+//it also configures the query as specified on the worker
+func (w *worker) getQuery() (*notmuch.Query, error) {
+	q := w.db.NewQuery(w.query)
+	q.SetExcludeScheme(notmuch.EXCLUDE_TRUE)
+	q.SetSortScheme(notmuch.SORT_OLDEST_FIRST)
+	for _, t := range w.excludedTags {
+		err := q.AddTagExclude(t)
+		if err != nil && err != notmuch.ErrIgnored {
+			return nil, err
+		}
+	}
+	return q, nil
+}
+
 func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error {
 	w.w.Logger.Printf("opening %s", msg.Directory)
 	// try the friendly name first, if that fails assume it's a query
-	query, ok := w.nameQueryMap[msg.Directory]
+	q, ok := w.nameQueryMap[msg.Directory]
 	if !ok {
-		query = msg.Directory
+		q = msg.Directory
 	}
-	w.selected = w.db.NewQuery(query)
-	w.selected.SetExcludeScheme(notmuch.EXCLUDE_TRUE)
-	w.selected.SetSortScheme(notmuch.SORT_OLDEST_FIRST)
-	for _, t := range w.excludedTags {
-		err := w.selected.AddTagExclude(t)
-		if err != nil && err != notmuch.ErrIgnored {
-			return err
-		}
+	w.query = q
+	query, err := w.getQuery()
+	if err != nil {
+		return err
 	}
 	//TODO: why does this need to be sent twice??
 	info := &types.DirectoryInfo{
@@ -176,7 +211,7 @@ func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error {
 			Flags:    []string{},
 			ReadOnly: false,
 			// total messages
-			Exists: w.selected.CountMessages(),
+			Exists: query.CountMessages(),
 			// new messages since mailbox was last opened
 			Recent: 0,
 			// total unread
@@ -191,7 +226,11 @@ func (w *worker) handleOpenDirectory(msg *types.OpenDirectory) error {
 
 func (w *worker) handleFetchDirectoryContents(
 	msg *types.FetchDirectoryContents) error {
-	uids, err := w.uidsFromQuery(w.selected)
+	q, err := w.getQuery()
+	if err != nil {
+		return err
+	}
+	uids, err := w.uidsFromQuery(q)
 	if err != nil {
 		w.w.Logger.Printf("error scanning uids: %v", err)
 		return err
@@ -253,9 +292,20 @@ func (w *worker) msgFromUid(uid uint32) (*Message, error) {
 		return nil, fmt.Errorf("Could not fetch message for key %q: %v", key, err)
 	}
 	msg := &Message{
-		key: key,
-		uid: uid,
-		msg: nm,
+		key:  key,
+		uid:  uid,
+		msg:  nm,
+		rwDB: w.connectRW,
+		refresh: func(m *Message) error {
+			//close the old message manually, else we segfault during gc
+			m.msg.Close()
+			err := w.connectRO()
+			if err != nil {
+				return err
+			}
+			m.msg, err = w.db.FindMessage(m.key)
+			return err
+		},
 	}
 	return msg, nil
 }