Testing a sharded cache. Could be useful for massively parallel applications

This commit is contained in:
Patrick Mylund Nielsen 2012-06-22 09:24:09 +01:00
parent 0f0584a805
commit 52c269d8ae
2 changed files with 218 additions and 20 deletions

161
cache.go
View File

@ -1,8 +1,10 @@
package cache
import (
"encoding/binary"
"encoding/gob"
"fmt"
"hash/fnv"
"io"
"os"
"reflect"
@ -277,20 +279,19 @@ func (j *janitor) Run(c *cache) {
}
}
func (j *janitor) Stop() {
j.stop <- true
}
func stopJanitor(c *Cache) {
c.janitor.Stop()
c.janitor.stop <- true
}
// Return a new cache with a given default expiration duration and cleanup
// interval. If the expiration duration is less than 1, the items in the cache
// never expire (by default), and must be deleted manually. If the cleanup
// interval is less than one, expired items are not deleted from the cache
// before their next lookup or before calling DeleteExpired.
func New(de, ci time.Duration) *Cache {
func runJanitor(c *cache, ci time.Duration) {
j := &janitor{
Interval: ci,
}
c.janitor = j
go j.Run(c)
}
func newCache(de time.Duration) *cache {
if de == 0 {
de = -1
}
@ -299,21 +300,145 @@ func New(de, ci time.Duration) *Cache {
Items: map[string]*Item{},
mu: sync.Mutex{},
}
if ci > 0 {
j := &janitor{
Interval: ci,
}
c.janitor = j
go j.Run(c)
return c
}
// Return a new cache with a given default expiration duration and cleanup
// interval. If the expiration duration is less than 1, the items in the cache
// never expire (by default), and must be deleted manually. If the cleanup
// interval is less than one, expired items are not deleted from the cache
// before their next lookup or before calling DeleteExpired.
func New(defaultExpiration, cleanupInterval time.Duration) *Cache {
c := newCache(defaultExpiration)
// This trick ensures that the janitor goroutine (which--granted it
// was enabled--is running DeleteExpired on c forever) does not keep
// the returned C object from being garbage collected. When it is
// garbage collected, the finalizer stops the janitor goroutine, after
// which c can be collected.
C := &Cache{c}
if ci > 0 {
if cleanupInterval > 0 {
runJanitor(c, cleanupInterval)
runtime.SetFinalizer(C, stopJanitor)
}
return C
}
type ShardedCache struct {
*shardedCache
}
type shardedCache struct {
m uint32
cs []*cache
janitor *shardedJanitor
}
func (sc *shardedCache) index(k string) uint32 {
h := fnv.New32()
h.Write([]byte(k))
n := binary.BigEndian.Uint32(h.Sum(nil))
return n % sc.m
}
func (sc *shardedCache) Set(k string, x interface{}, d time.Duration) {
sc.cs[sc.index(k)].Set(k, x, d)
}
func (sc *shardedCache) Add(k string, x interface{}, d time.Duration) error {
return sc.cs[sc.index(k)].Add(k, x, d)
}
func (sc *shardedCache) Replace(k string, x interface{}, d time.Duration) error {
return sc.cs[sc.index(k)].Replace(k, x, d)
}
func (sc *shardedCache) Get(k string) (interface{}, bool) {
return sc.cs[sc.index(k)].Get(k)
}
func (sc *shardedCache) Increment(k string, n int64) error {
return sc.cs[sc.index(k)].Increment(k, n)
}
func (sc *shardedCache) IncrementFloat(k string, n float64) error {
return sc.cs[sc.index(k)].IncrementFloat(k, n)
}
func (sc *shardedCache) Decrement(k string, n int64) error {
return sc.cs[sc.index(k)].Decrement(k, n)
}
func (sc *shardedCache) Delete(k string) {
sc.cs[sc.index(k)].Delete(k)
}
func (sc *shardedCache) DeleteExpired() {
for _, v := range sc.cs {
v.DeleteExpired()
}
}
func (sc *shardedCache) Flush() {
for _, v := range sc.cs {
v.Flush()
}
}
type shardedJanitor struct {
Interval time.Duration
stop chan bool
}
func (j *shardedJanitor) Run(sc *shardedCache) {
j.stop = make(chan bool)
tick := time.Tick(j.Interval)
for {
select {
case <-tick:
sc.DeleteExpired()
case <-j.stop:
return
}
}
}
func stopShardedJanitor(sc *ShardedCache) {
sc.janitor.stop <- true
}
func runShardedJanitor(sc *shardedCache, ci time.Duration) {
j := &shardedJanitor{
Interval: ci,
}
sc.janitor = j
go j.Run(sc)
}
func newShardedCache(n int, de time.Duration) *shardedCache {
sc := &shardedCache{
m: uint32(n - 1),
cs: make([]*cache, n),
}
for i := 0; i < n; i++ {
c := &cache{
DefaultExpiration: de,
Items: map[string]*Item{},
mu: sync.Mutex{},
}
sc.cs[i] = c
}
return sc
}
func NewSharded(shards int, defaultExpiration, cleanupInterval time.Duration) *ShardedCache {
if defaultExpiration == 0 {
defaultExpiration = -1
}
sc := newShardedCache(shards, defaultExpiration)
SC := &ShardedCache{sc}
if cleanupInterval > 0 {
runShardedJanitor(sc, cleanupInterval)
runtime.SetFinalizer(SC, stopShardedJanitor)
}
return SC
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"io/ioutil"
"runtime"
"strconv"
"sync"
"testing"
"time"
@ -636,18 +637,22 @@ func TestSerializeUnserializable(t *testing.T) {
}
func BenchmarkCacheGet(b *testing.B) {
b.StopTimer()
tc := New(0, 0)
tc.Set("foo", "bar", 0)
b.StartTimer()
for i := 0; i < b.N; i++ {
tc.Get("foo")
}
}
func BenchmarkMutexMapGet(b *testing.B) {
b.StopTimer()
m := map[string]string{
"foo": "bar",
}
mu := sync.Mutex{}
b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
_, _ = m["foo"]
@ -656,12 +661,14 @@ func BenchmarkMutexMapGet(b *testing.B) {
}
func BenchmarkCacheGetConcurrent(b *testing.B) {
b.StopTimer()
tc := New(0, 0)
tc.Set("foo", "bar", 0)
wg := new(sync.WaitGroup)
workers := runtime.NumCPU()
each := b.N / workers
wg.Add(workers)
b.StartTimer()
for i := 0; i < workers; i++ {
go func() {
for j := 0; j < each; j++ {
@ -674,6 +681,7 @@ func BenchmarkCacheGetConcurrent(b *testing.B) {
}
func BenchmarkMutexMapGetConcurrent(b *testing.B) {
b.StopTimer()
m := map[string]string{
"foo": "bar",
}
@ -682,6 +690,7 @@ func BenchmarkMutexMapGetConcurrent(b *testing.B) {
workers := runtime.NumCPU()
each := b.N / workers
wg.Add(workers)
b.StartTimer()
for i := 0; i < workers; i++ {
go func() {
for j := 0; j < each; j++ {
@ -695,16 +704,72 @@ func BenchmarkMutexMapGetConcurrent(b *testing.B) {
wg.Wait()
}
func BenchmarkCacheSet(b *testing.B) {
func BenchmarkCacheGetManyConcurrent(b *testing.B) {
// This is the same as BenchmarkCacheGetConcurrent, but its result
// can be compared against BenchmarkShardedCacheGetManyConcurrent.
b.StopTimer()
n := 10000
tc := New(0, 0)
keys := make([]string, n)
for i := 0; i < n; i++ {
k := "foo" + strconv.Itoa(n)
keys[i] = k
tc.Set(k, "bar", 0)
}
each := b.N / n
wg := new(sync.WaitGroup)
wg.Add(n)
for _, v := range keys {
go func() {
for j := 0; j < each; j++ {
tc.Get(v)
}
wg.Done()
}()
}
b.StartTimer()
wg.Wait()
}
func BenchmarkShardedCacheGetManyConcurrent(b *testing.B) {
b.StopTimer()
n := 10000
tsc := NewSharded(20, 0, 0)
keys := make([]string, n)
for i := 0; i < n; i++ {
k := "foo" + strconv.Itoa(n)
keys[i] = k
tsc.Set(k, "bar", 0)
}
each := b.N / n
wg := new(sync.WaitGroup)
wg.Add(n)
for _, v := range keys {
go func() {
for j := 0; j < each; j++ {
tsc.Get(v)
}
wg.Done()
}()
}
b.StartTimer()
wg.Wait()
}
func BenchmarkCacheSet(b *testing.B) {
b.StopTimer()
tc := New(0, 0)
b.StartTimer()
for i := 0; i < b.N; i++ {
tc.Set("foo", "bar", 0)
}
}
func BenchmarkMutexMapSet(b *testing.B) {
b.StopTimer()
m := map[string]string{}
mu := sync.Mutex{}
b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
m["foo"] = "bar"
@ -713,7 +778,9 @@ func BenchmarkMutexMapSet(b *testing.B) {
}
func BenchmarkCacheSetDelete(b *testing.B) {
b.StopTimer()
tc := New(0, 0)
b.StartTimer()
for i := 0; i < b.N; i++ {
tc.Set("foo", "bar", 0)
tc.Delete("foo")
@ -721,8 +788,10 @@ func BenchmarkCacheSetDelete(b *testing.B) {
}
func BenchmarkMutexMapSetDelete(b *testing.B) {
b.StopTimer()
m := map[string]string{}
mu := sync.Mutex{}
b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
m["foo"] = "bar"
@ -734,7 +803,9 @@ func BenchmarkMutexMapSetDelete(b *testing.B) {
}
func BenchmarkCacheSetDeleteSingleLock(b *testing.B) {
b.StopTimer()
tc := New(0, 0)
b.StartTimer()
for i := 0; i < b.N; i++ {
tc.mu.Lock()
tc.set("foo", "bar", 0)
@ -744,8 +815,10 @@ func BenchmarkCacheSetDeleteSingleLock(b *testing.B) {
}
func BenchmarkMutexMapSetDeleteSingleLock(b *testing.B) {
b.StopTimer()
m := map[string]string{}
mu := sync.Mutex{}
b.StartTimer()
for i := 0; i < b.N; i++ {
mu.Lock()
m["foo"] = "bar"