summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--worker/types/worker.go56
-rw-r--r--worker/worker.go7
2 files changed, 42 insertions, 21 deletions
diff --git a/worker/types/worker.go b/worker/types/worker.go
index 1ff16a4..0ed216a 100644
--- a/worker/types/worker.go
+++ b/worker/types/worker.go
@@ -2,6 +2,7 @@ package types
 
 import (
 	"log"
+	"sync"
 )
 
 type Backend interface {
@@ -9,11 +10,42 @@ type Backend interface {
 }
 
 type Worker struct {
-	Actions   chan WorkerMessage
-	Backend   Backend
-	Callbacks map[WorkerMessage]func(msg WorkerMessage)
-	Messages  chan WorkerMessage
-	Logger    *log.Logger
+	Backend  Backend
+	Actions  chan WorkerMessage
+	Messages chan WorkerMessage
+	Logger   *log.Logger
+
+	callbacks map[WorkerMessage]func(msg WorkerMessage) // protected by mutex
+	mutex     sync.Mutex
+}
+
+func NewWorker(logger *log.Logger) *Worker {
+	return &Worker{
+		Actions:   make(chan WorkerMessage, 50),
+		Messages:  make(chan WorkerMessage, 50),
+		Logger:    logger,
+		callbacks: make(map[WorkerMessage]func(msg WorkerMessage)),
+	}
+}
+
+func (worker *Worker) setCallback(msg WorkerMessage,
+	cb func(msg WorkerMessage)) {
+
+	if cb != nil {
+		worker.mutex.Lock()
+		worker.callbacks[msg] = cb
+		worker.mutex.Unlock()
+	}
+}
+
+func (worker *Worker) getCallback(msg WorkerMessage) (func(msg WorkerMessage),
+	bool) {
+
+	worker.mutex.Lock()
+	cb, ok := worker.callbacks[msg]
+	worker.mutex.Unlock()
+
+	return cb, ok
 }
 
 func (worker *Worker) PostAction(msg WorkerMessage,
@@ -26,9 +58,7 @@ func (worker *Worker) PostAction(msg WorkerMessage,
 	}
 	worker.Actions <- msg
 
-	if cb != nil {
-		worker.Callbacks[msg] = cb
-	}
+	worker.setCallback(msg, cb)
 }
 
 func (worker *Worker) PostMessage(msg WorkerMessage,
@@ -41,9 +71,7 @@ func (worker *Worker) PostMessage(msg WorkerMessage,
 	}
 	worker.Messages <- msg
 
-	if cb != nil {
-		worker.Callbacks[msg] = cb
-	}
+	worker.setCallback(msg, cb)
 }
 
 func (worker *Worker) ProcessMessage(msg WorkerMessage) WorkerMessage {
@@ -52,9 +80,8 @@ func (worker *Worker) ProcessMessage(msg WorkerMessage) WorkerMessage {
 	} else {
 		worker.Logger.Printf("(ui)<= %T\n", msg)
 	}
-	if cb, ok := worker.Callbacks[msg.InResponseTo()]; ok {
+	if cb, ok := worker.getCallback(msg.InResponseTo()); ok {
 		cb(msg)
-		delete(worker.Callbacks, msg)
 	}
 	return msg
 }
@@ -65,9 +92,8 @@ func (worker *Worker) ProcessAction(msg WorkerMessage) WorkerMessage {
 	} else {
 		worker.Logger.Printf("<-(ui) %T\n", msg)
 	}
-	if cb, ok := worker.Callbacks[msg.InResponseTo()]; ok {
+	if cb, ok := worker.getCallback(msg.InResponseTo()); ok {
 		cb(msg)
-		delete(worker.Callbacks, msg)
 	}
 	return msg
 }
diff --git a/worker/worker.go b/worker/worker.go
index 959c7df..29bb560 100644
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -15,12 +15,7 @@ func NewWorker(source string, logger *log.Logger) (*types.Worker, error) {
 	if err != nil {
 		return nil, err
 	}
-	worker := &types.Worker{
-		Actions:   make(chan types.WorkerMessage, 50),
-		Callbacks: make(map[types.WorkerMessage]func(msg types.WorkerMessage)),
-		Messages:  make(chan types.WorkerMessage, 50),
-		Logger:    logger,
-	}
+	worker := types.NewWorker(logger)
 	switch u.Scheme {
 	case "imap":
 		fallthrough