Thread Safety in Go: Protecting Shared Data
Understanding Thread Safety
Thread safety ensures that multiple goroutines can safely access shared resources without causing data races or inconsistencies. In Go, we achieve this through several mechanisms:
- Mutexes
- Read-Write Mutexes
- Atomic Operations
- Channels
Let's implement a thread-safe queue to demonstrate these concepts. the below code demonstrates the implementation of a thread-safe queue using mutexes, read-write mutexes, and channels.
Implementation of concurrent thread-safe queue
package main
import (
"fmt"
"sync"
)
// Traditional mutex-based queue
type SafeConcurrentQueue struct {
mu sync.Mutex
queue []int32
}
func (q *SafeConcurrentQueue) Enqueue(item int32) {
q.mu.Lock()
defer q.mu.Unlock()
q.queue = append(q.queue,item)
}
func (q *SafeConcurrentQueue) Dequeue() int32 {
q.mu.Lock()
if(len(q.queue)==0){
panic("empty queue");
}
defer q.mu.Unlock()
item := q.queue[0]
q.queue = q.queue[1:]
return item
}
func (q *SafeConcurrentQueue) Size() int{
q.mu.Lock()
defer q.mu.Unlock();
return len(q.queue);
}
// RW mutex-based queue for better read performance
type RWSafeConcurrentQueue struct {
mu sync.RWMutex
queue []int32
}
func (q *RWSafeConcurrentQueue) Enqueue(item int32) {
q.mu.Lock()
defer q.mu.Unlock()
q.queue = append(q.queue,item)
}
func (q *RWSafeConcurrentQueue) Dequeue() int32 {
q.mu.Lock()
if(len(q.queue)==0){
panic("empty queue");
}
defer q.mu.Unlock()
item := q.queue[0]
q.queue = q.queue[1:]
return item
}
func (q *RWSafeConcurrentQueue) Size() int{
q.mu.RLock()
defer q.mu.RUnlock();
return len(q.queue);
}
type ChannelQueue struct {
enqueue chan int32
dequeue chan int32
size chan int
done chan struct{}
}
func NewChannelQueue() *ChannelQueue {
q := &ChannelQueue{
enqueue: make(chan int32),
dequeue: make(chan int32),
size: make(chan int),
done: make(chan struct{}),
}
go q.run()
return q
}
func (q *ChannelQueue) run() {
queue := []int32{}
for {
if len(queue) == 0 {
select {
case item := <-q.enqueue:
queue = append(queue, item)
case q.size <- len(queue):
case <-q.done:
return
}
} else {
select {
case item := <-q.enqueue:
queue = append(queue, item)
case q.dequeue <- queue[0]:
queue = queue[1:]
case q.size <- len(queue):
case <-q.done:
return
}
}
}
}
func (q *ChannelQueue) Enqueue(item int32) {
q.enqueue <- item
}
func (q *ChannelQueue) Dequeue() (int32, bool) {
select {
case item := <-q.dequeue:
return item, true
default:
return 0, false
}
}
func (q *ChannelQueue) Size() int {
return <-q.size
}
func (q *ChannelQueue) Stop() {
close(q.done)
}
var wgE sync.WaitGroup
var wgD sync.WaitGroup
func main() {
// Example usage with multiple goroutines
queue := &RWSafeConcurrentQueue{
queue:make([]int32,0),
}
//for channel based queue implementaion
// queue :=NewChannelQueue();
for i := 0; i < 100000; i++ {
wgE.Add(1)
go func() {
defer wgE.Done()
queue.Enqueue(1)
}()
}
wgE.Wait()
fmt.Printf("Final count: %d\n", queue.Size())
//for channel based queue implementaion
// queue.Stop()
}
Best Practices
-
Choose the right synchronization primitive:
- Use
sync.Mutex
for simple exclusive access - Use
sync.RWMutex
when reads significantly outnumber writes - Use atomic operations for simple queues and flags
- Use channels for communication between goroutines
- Use
-
Always protect shared data:
- Keep mutex and protected data in the same struct
- Use
defer
for unlocking - Minimize the critical section
-
Consider performance implications:
- RWMutex has overhead compared to Mutex
- Channel operations are more expensive than mutex operations
- Atomic operations are fastest for simple cases
Common Pitfalls
- Forgetting to unlock mutexes
- copying mutex values instead of using pointers
- Holding locks during expensive operations
- Using mutexes when atomic operations would suffice
The implementation above demonstrates these concepts with practical examples you can use in your applications.