分类目录归档:cache

memcache start parameters

【黑体字的参数较为常用】

-p<num> 监听的TCP端口(默认:11211)
-U<num> UDP监听端口(默认:11211 0关闭)
-d 以守护进程方式运行
-u<username> 指定用户运行
-m<num>. 最大内存使用,单位MB。默认64MB
-c<num> 最大同时连接数,默认是1024
-v 输出警告和错误消息
-vv 打印客户端的请求和返回信息
-h 帮助信息
-l<ip> 绑定地址(默认任何ip地址都可以访问)
-P<file> 将PID保存在file文件
-i 打印memcached和libevent版权信息
-M 禁止LRU策略,内存耗尽时返回错误
-f<factor> 增长因子,默认1.25
-n<bytes> 初始chunk=key+suffix+value+32结构体,默认48字节
-L 启用大内存页,可以降低内存浪费,改进性能
-l

调整分配slab页的大小,默认1M,最小1k到128M

-t<num>

线程数,默认4。由于memcached采用NIO,所以更多线程没有太多作用

-R

每个event连接最大并发数,默认20

-C

禁用CAS命令(可以禁止版本计数,减少开销)

-b

Set the backlog queue limit (default: 1024)

-B

Binding protocol-one of ascii, binary or auto (default)

-s<file>

UNIX socket

-a<mask>

access mask for UNIX socket, in octal (default: 0700)

5、Memcache指令汇总

指令 描述 例子
get key #返回对应的value get mykey
set key 标识符 有效时间 长度 key不存在添加,存在更新 set mykey 0 60 5
add key标识符 有效时间 长度 #添加key-value值,返回stored/not_stored add mykey 0 60 5
replace key标识符 有效时间 长度 #替换key中的value,key存在成功返回stored,key不存在失败返回not_stored replace mykey 0 60 5
append key标识符 有效时间 长度 #追加key中的value值,成功返回stored,失败返回not_stored append mykey 0 60 5
prepend key标识符 有效时间 长度 #前置追加key中的value值,成功返回stored,失败返回not_stored prepend mykey 0 60 5
incr key num #给key中的value增加num。若key中不是数字,则将使用num替换value值。返回增加后的value Incre mykey 1
decr #同上 同上
delete key [key2…] 删除一个或者多个key-value。成功删除返回deleted,不存在失败则返回not_found delete mykey
flush_all [timeount] #清除所有[timeout时间内的]键值,但不会删除items,所以memcache依旧占用内存 flush_all 20
version #返回版本号 version
verbosity #日志级别 verbosity
quit #关闭连接 quit
stats #返回Memcache通用统计信息 stats
stats slabs #返回Memcache运行期间创建的每个slab的信息 stats slabs
stats items #返回各个slab中item的个数,和最老的item秒数 stats items
stats malloc #显示内存分配数据 stats malloc
stats detail [on|off|dump] #on:打开详细操作记录、off:关闭详细操作记录、dump显示详细操作记录(每一个键的get、set、hit、del的次数) stats detail on

stats detail off

stats detail dump

stats cachedump slab_id limit_num #显示slab_id中前limit_num个key stats cachedump 1 2
stats reset #清空统计数据 stats reset
stats settings #查看配置设置 stats settings
stats sizes #展示了固定chunk大小中的items的数量 Stats sizes

注意:标识符:一个十六进制无符号的整数(以十进制来表示),需和数据一起存储,get的时候一起返回

memcached code analyse

名词

Slab
用于表示存储的最大size数据,仅仅只是用于定义(通俗的讲就是表示可以存储数据大小的范围)。默认情况下,前后两个slab表示存储的size以1.25倍进行增长。例如slab1为96字节,slab2为120字节

Page
分配给Slab的内存空间,默认为1MB。分给Slab后将会根据slab的大小切割成chunk

Chunk
用于缓存记录的内存空间

Slab calss
特定大小的Chunk集合

增长因子

增长因子就是相邻两个chunk之间的增长倍数。 memcache默认是1.25。
2倍 chunk size增长。
 
 

struct item:

#define ITEM_LINKED 1 //该item插入到LRU队列了
#define ITEM_CAS 2 //该item使用CAS
#define ITEM_SLABBED 4 //该item还在slab的空闲队列里面,没有分配出去
#define ITEM_FETCHED 8 //该item插入到LRU队列后,被worker线程访问过

typedef struct _stritem {
    struct _stritem *next;//next指针,用于LRU链表
    struct _stritem *prev;//prev指针,用于LRU链表
    struct _stritem *h_next;//h_next指针,用于哈希表的冲突链
    rel_time_t      time;//最后一次访问时间。绝对时间
    rel_time_t      exptime;//过期失效时间,绝对时间
    int             nbytes;//本item存放的数据的长度   
    unsigned short  refcount;//本item的引用数
    uint8_t         nsuffix;//后缀长度    /* length of flags-and-length string */
    uint8_t         it_flags;//item的属性   /* ITEM_* above */
    uint8_t         slabs_clsid;/* which slab class we're in */
    uint8_t         nkey;//键值的长度       /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
     * the little shuffle to save space when not using CAS. */
    union {
        uint64_t cas;
        char end;
    } data[];
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

struct slabclass:

typedef struct {
    unsigned int size;      /* sizes of items */
    unsigned int perslab;   /* how many items per slab */

    void *slots;           /* list of item ptrs */
    unsigned int sl_curr;   /* total free items in list */

    void *end_page_ptr;         /* pointer to next free item at end of page, or 0 */
    unsigned int end_page_free; /* number of items remaining at end of last alloced page */

    unsigned int slabs;     /* how many slabs were allocated for this class */

    void **slab_list;       /* array of slab pointers */
    unsigned int list_size; /* size of prev array */

    unsigned int killing;  /* index+1 of dying slab, or zero if none */
    size_t requested; /* The number of requested bytes */
} slabclass_t;


item_link_q:

将item插入到LRU队列的头部
//将item插入到LRU队列的头部
static void item_link_q(item *it) { /* item is the new head */
    item **head, **tail;
    assert(it->slabs_clsid < LARGEST_ID); assert((it->it_flags & ITEM_SLABBED) == 0);

    head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];
    assert(it != *head);
    assert((*head && *tail) || (*head == 0 && *tail == 0));

	//头插法插入该item
    it->prev = 0;
    it->next = *head;
    if (it->next) it->next->prev = it;
    *head = it;//该item作为对应链表的第一个节点

	//如果该item是对应id上的第一个item,那么还会被认为是该id链上的最后一个item
	//因为在head那里使用头插法,所以第一个插入的item,到了后面确实成了最后一个item
    if (*tail == 0) *tail = it;
    sizes[it->slabs_clsid]++;//个数加一
    return;
}

item_unlink_q:

将item从对应的LRU队列中删除
static void item_unlink_q(item *it) {
    item **head, **tail;
    assert(it->slabs_clsid < LARGEST_ID); head = &heads[it->slabs_clsid];
    tail = &tails[it->slabs_clsid];

	
    if (*head == it) {//链表上的第一个节点
        assert(it->prev == 0);
        *head = it->next;
    }
    if (*tail == it) {//链表上的最后一个节点
        assert(it->next == 0);
        *tail = it->prev;
    }
    assert(it->next != it);
    assert(it->prev != it);

	//把item的前驱节点和后驱节点连接起来
    if (it->next) it->next->prev = it->prev;
    if (it->prev) it->prev->next = it->next;
    sizes[it->slabs_clsid]--;//个数减一
    return;
}


do_item_update:

更新item
#define ITEM_UPDATE_INTERVAL 60 //更新频率为60秒

void do_item_update(item *it) {
	//下面的代码可以看到update操作是耗时的。如果这个item频繁被访问,
	//那么会导致过多的update,过多的一系列费时操作。此时更新间隔就应运而生
	//了。如果上一次的访问时间(也可以说是update时间)距离现在(current_time)
	//还在更新间隔内的,就不更新。超出了才更新。
    if (it->time < current_time - ITEM_UPDATE_INTERVAL) { mutex_lock(&cache_lock); if ((it->it_flags & ITEM_LINKED) != 0) {
            item_unlink_q(it);//从LRU队列中删除
            it->time = current_time;//更新访问时间
            item_link_q(it);//插入到LRU队列的头部
        }
        mutex_unlock(&cache_lock);
    }
}

do_slabs_newslab:

分配一个新的 slab
static int do_slabs_newslab(const unsigned int id) {
    slabclass_t *p = &slabclass[id];
    int len = settings.slab_reassign ? settings.item_size_max
        : p->size * p->perslab;
    char *ptr;

    if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0) ||
        (grow_slab_list(id) == 0) ||
        ((ptr = memory_allocate((size_t)len)) == 0)) {

        MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
        return 0;
    }

    memset(ptr, 0, (size_t)len);
    p->end_page_ptr = ptr;
    p->end_page_free = p->perslab;

    p->slab_list[p->slabs++] = ptr;
    mem_malloced += len;
    MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);

    return 1;
}

do_slabs_alloc:

从指定的 slabclass, 即 slabclass[id], 分配大小为 size 的内存块供申请者使用.
分配的原则是, 优先从 slots 指向的空闲链表中分配, 空闲链表没有, 才从 slab 中分配一个空闲的 chunk.
static void *do_slabs_alloc(const size_t size, unsigned int id) {
    slabclass_t *p;
    void *ret = NULL;
    item *it = NULL;

    if (id < POWER_SMALLEST || id > power_largest) {
        MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);
        return NULL;
    }

    p = &slabclass[id];
    assert(p->sl_curr == 0 || ((item *)p->slots)->slabs_clsid == 0);

 /* 如果不使用 slab 机制, 则直接从操作系统分配 */
#ifdef USE_SYSTEM_MALLOC
    if (mem_limit && mem_malloced + size > mem_limit) {
        MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);
        return 0;
    }
    mem_malloced += size;
    ret = malloc(size);
    MEMCACHED_SLABS_ALLOCATE(size, id, 0, ret);
    return ret;
#endif

    /* fail unless we have space at the end of a recently allocated page,
       we have something on our freelist, or we could allocate a new page */
 /* 确保最后一个slab 有空闲的chunk */
    if (! (p->end_page_ptr != 0 || p->sl_curr != 0 ||
           do_slabs_newslab(id) != 0)) {
        /* We don't have more memory available */
        ret = NULL;
    } else if (p->sl_curr != 0) {
  /* 从空闲list分配一个item */
        /* return off our freelist */
        it = (item *)p->slots;
        p->slots = it->next;
        if (it->next) it->next->prev = 0;
        p->sl_curr--;
        ret = (void *)it;
    } else {
  /* 从最后一个slab中分配一个空闲chunk */
        /* if we recently allocated a whole page, return from that */
        assert(p->end_page_ptr != NULL);
        ret = p->end_page_ptr;
        if (--p->end_page_free != 0) {
            p->end_page_ptr = ((caddr_t)p->end_page_ptr) + p->size;
        } else {
            p->end_page_ptr = 0;
        }
    }

    if (ret) {
        p->requested += size;
        MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);
    } else {
        MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);
    }

    return ret;
}

do_item_alloc:

从 slab 系统分配一个空闲 item
item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
    uint8_t nsuffix;
    item *it = NULL;
    char suffix[40];
    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
    if (settings.use_cas) {
        ntotal += sizeof(uint64_t);
    }

    unsigned int id = slabs_clsid(ntotal);
    if (id == 0)
        return 0;

    mutex_lock(&cache_lock);
    /* do a quick check if we have any expired items in the tail.. */
    item *search;
    rel_time_t oldest_live = settings.oldest_live;

    search = tails[id];
    if (search != NULL && (refcount_incr(&search->refcount) == 2)) {
  /* 先检查 LRU 队列最后一个 item 是否超时, 超时的话就把这个 item 分配给用户 */
        if ((search->exptime != 0 && search->exptime < current_time) || (search->time <= oldest_live && oldest_live <= current_time)) { // dead by flush STATS_LOCK(); stats.reclaimed++; STATS_UNLOCK(); itemstats[id].reclaimed++; if ((search->it_flags & ITEM_FETCHED) == 0) {
                STATS_LOCK();
                stats.expired_unfetched++;
                STATS_UNLOCK();
                itemstats[id].expired_unfetched++;
            }
            it = search;
            slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
   /* 把这个 item 从 LRU 队列和哈希表中移除 */
            do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0));
            /* Initialize the item block: */
            it->slabs_clsid = 0;
  /* 没有超时的 item, 那就尝试从 slabclass 分配, 运气不好的话, 分配失败,
     那就把 LRU 队列最后一个 item 剔除, 然后分配给用户 */
        } else if ((it = slabs_alloc(ntotal, id)) == NULL) {
            if (settings.evict_to_free == 0) {
                itemstats[id].outofmemory++;
                pthread_mutex_unlock(&cache_lock);
                return NULL;
            }
            itemstats[id].evicted++;
            itemstats[id].evicted_time = current_time - search->time;
            if (search->exptime != 0)
                itemstats[id].evicted_nonzero++;
            if ((search->it_flags & ITEM_FETCHED) == 0) {
                STATS_LOCK();
                stats.evicted_unfetched++;
                STATS_UNLOCK();
                itemstats[id].evicted_unfetched++;
            }
            STATS_LOCK();
            stats.evictions++;
            STATS_UNLOCK();
            it = search;
            slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
   /* 把这个 item 从 LRU 队列和哈希表中移除 */
            do_item_unlink_nolock(it, hash(ITEM_key(it), it->nkey, 0));
            /* Initialize the item block: */
            it->slabs_clsid = 0;
        } else {
            refcount_decr(&search->refcount);
        }
 /* LRU 队列是空的, 或者锁住了, 那就只能从 slabclass 分配内存 */
    } else {
        /* If the LRU is empty or locked, attempt to allocate memory */
        it = slabs_alloc(ntotal, id);
        if (search != NULL)
            refcount_decr(&search->refcount);
    }

    if (it == NULL) {
        itemstats[id].outofmemory++;
        /* Last ditch effort. There was a very rare bug which caused
         * refcount leaks. We leave this just in case they ever happen again.
         * We can reasonably assume no item can stay locked for more than
         * three hours, so if we find one in the tail which is that old,
         * free it anyway.
         */
        if (search != NULL &&
            search->refcount != 2 &&
            search->time + TAIL_REPAIR_TIME < current_time) { itemstats[id].tailrepairs++; search->refcount = 1;
            do_item_unlink_nolock(search, hash(ITEM_key(search), search->nkey, 0));
        }
        pthread_mutex_unlock(&cache_lock);
        return NULL;
    }

    assert(it->slabs_clsid == 0);
    assert(it != heads[id]);

 /* 顺便对 item 做一些初始化 */
    /* Item initialization can happen outside of the lock; the item's already
     * been removed from the slab LRU.
     */
    it->refcount = 1;     /* the caller will have a reference */
    pthread_mutex_unlock(&cache_lock);
    it->next = it->prev = it->h_next = 0;
    it->slabs_clsid = id;

    DEBUG_REFCNT(it, '*');
    it->it_flags = settings.use_cas ? ITEM_CAS : 0;
    it->nkey = nkey;
    it->nbytes = nbytes;
    memcpy(ITEM_key(it), key, nkey);
    it->exptime = exptime;
    memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    it->nsuffix = nsuffix;
    return it;
}

do_item_link

形成了一个完成的 item 后, 就要把它放入两个数据结构中, 一是 memcached 的哈希表,
memcached 运行过程中只有一个哈希表, 二是 item 所在的 slabclass 的 LRU 队列.
每个 slabclass 都有一个 LRU 队列
int do_item_link(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
    mutex_lock(&cache_lock);
    it->it_flags |= ITEM_LINKED;
    it->time = current_time;

    STATS_LOCK();
    stats.curr_bytes += ITEM_ntotal(it);
    stats.curr_items += 1;
    stats.total_items += 1;
    STATS_UNLOCK();

    /* Allocate a new CAS ID on link. */
    ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
 /* 把 item 放入哈希表 */
    assoc_insert(it, hv);
 /* 把 item 放入 LRU 队列*/
    item_link_q(it);
    refcount_incr(&it->refcount);
    pthread_mutex_unlock(&cache_lock);

    return 1;
}
 

do_item_unlink

do_item_unlink 与 do_item_unlink 做的工作相反, 把 item 从哈希表和 LRU 队列中删除,
而且还释放掉 item 所占的内存 (其实只是把 item 放到空闲链表中).
void do_item_unlink(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_UNLINK(ITEM_key(it), it->nkey, it->nbytes);
    mutex_lock(&cache_lock);
    if ((it->it_flags & ITEM_LINKED) != 0) {
        it->it_flags &= ~ITEM_LINKED;
        STATS_LOCK();
        stats.curr_bytes -= ITEM_ntotal(it);
        stats.curr_items -= 1;
        STATS_UNLOCK();
  /* 从哈希表中删除 item */
        assoc_delete(ITEM_key(it), it->nkey, hv);
  /* 从 LRU 队列中删除 item */
        item_unlink_q(it);
  /* 释放 item 所占的内存 */
        do_item_remove(it);
    }
    pthread_mutex_unlock(&cache_lock);
}
 

item_get

根据 key 找对应的 item
/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
    mutex_lock(&cache_lock);
 /* 从哈希表中找 item */
    item *it = assoc_find(key, nkey, hv);
    if (it != NULL) {
        refcount_incr(&it->refcount);
        /* Optimization for slab reassignment. prevents popular items from
         * jamming in busy wait. Can only do this here to satisfy lock order
         * of item_lock, cache_lock, slabs_lock. */
        if (slab_rebalance_signal &&
            ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) { do_item_unlink_nolock(it, hv); do_item_remove(it); it = NULL; } } pthread_mutex_unlock(&cache_lock); int was_found = 0; if (settings.verbose > 2) {
        if (it == NULL) {
            fprintf(stderr, "> NOT FOUND %s", key);
        } else {
            fprintf(stderr, "> FOUND KEY %s", ITEM_key(it));
            was_found++;
        }
    }

 /* 找到了, 然后检查是否超时 */
    if (it != NULL) {
        if (settings.oldest_live != 0 && settings.oldest_live <= current_time && it->time <= settings.oldest_live) { do_item_unlink(it, hv); do_item_remove(it); it = NULL; if (was_found) { fprintf(stderr, " -nuked by flush"); } } else if (it->exptime != 0 && it->exptime <= current_time) { do_item_unlink(it, hv); do_item_remove(it); it = NULL; if (was_found) { fprintf(stderr, " -nuked by expire"); } } else { it->it_flags |= ITEM_FETCHED;
            DEBUG_REFCNT(it, '+');
        }
    }

    if (settings.verbose > 2)
        fprintf(stderr, "\n");

    return it;
}

slabs_clsid:

根据大小判定slab class
unsigned int slabs_clsid(const size_t size) {//返回slabclass索引下标值
    int res = POWER_SMALLEST;//res的初始值为1

	//返回0表示查找失败,因为slabclass数组中,第一个元素是没有使用的
    if (size == 0)
        return 0;
	
	//因为slabclass数组中各个元素能分配的item大小是升序的
	//所以从小到大直接判断即可在数组找到最小但又能满足的元素
    while (size > slabclass[res].size)
        if (res++ == power_largest)     /* won't fit in the biggest slab */
            return 0;
    return res;
}

 

 
Links:
http://blog.csdn.net/luotuo44/article/details/42963793
 

WARNING overcommit_memory , vm.overcommit_memory=1

来源:http://m.blog.csdn.net/article/details?id=50864933

 

Redis 3.0.7版本启动时出现警告的解决办法

7283:M 12 Mar 12:13:33.749 # WARNING: The
TCP backlog setting of 511 cannot be enforced because
/proc/sys/net/core/somaxconn is set to the lower value of 128.

7283:M 12 Mar 12:13:33.749 # Server started, Redis version 3.0.7
7283:M 12 Mar 12:13:33.749 # WARNING
overcommit_memory is set to 0! Background save may fail under low memory
condition. To fix this issue add 'vm.overcommit_memory = 1' to
/etc/sysctl.conf and then reboot or run the command 'sysctl
vm.overcommit_memory=1' for this to take effect.

7283:M 12 Mar 12:13:33.749 * The server is now ready to accept connections on port 6379

第1个警告(WARNING: The TCP backlog setting of 511 ......)解决办法
方法1: 临时设置生效: sysctl -w net.core.somaxconn = 1024
方法2: 永久生效: 修改/etc/sysctl.conf文件,增加一行
net.core.somaxconn= 1024
然后执行命令
sysctl -p

补充:
net.core.somaxconn是linux中的一个kernel参数,表示socket监听(listen)的backlog上限。
backlog是socket的监听队列,当一个请求(request)尚未被处理或建立时,他会进入backlog。
而socket server可以一次性处理backlog中的所有请求,处理后的请求不再位于监听队列中。
当server处理请求较慢,以至于监听队列被填满后,新来的请求会被拒绝。
所以说net.core.somaxconn限制了接收新 TCP 连接侦听队列的大小。
对于一个经常处理新连接的高负载 web服务环境来说,默认的 128 太小了。大多数环境这个值建议增加到 1024 或者更多。

第2个警告(WARNING overcommit_memory is set to 0! ......)同样也有两个解决办法
方法1: 临时设置生效: sysctl -w vm.overcommit_memory = 1
方法2: 永久生效: 修改/etc/sysctl.conf文件,增加一行
vm.overcommit_memory = 1
然后执行命令
sysctl -p

补充: 
overcommit_memory参数说明:
设置内存分配策略(可选,根据服务器的实际情况进行设置)
/proc/sys/vm/overcommit_memory
可选值:0、1、2。
0, 表示内核将检查是否有足够的可用内存供应用进程使用;如果有足够的可用内存,内存申请允许;否则,内存申请失败,并把错误返回给应用进程。
1, 表示内核允许分配所有的物理内存,而不管当前的内存状态如何。
2, 表示内核允许分配超过所有物理内存和交换空间总和的内存
注意:redis在dump数据的时候,会fork出一个子进程,理论上child进程所占用的内存和parent是一样的,比如parent占用的内存
为8G,这个时候也要同样分配8G的内存给child,如果内存无法负担,往往会造成redis服务器的down机或者IO负载过高,效率下降。所以这里
比较优化的内存分配策略应该设置为 1(表示内核允许分配所有的物理内存,而不管当前的内存状态如何)。

 

Redis开源的分布式缓存

3.0.7 安装一例:


yum install gcc tcl -y
wget http://download.redis.io/releases/redis-3.0.7.tar.gz
tar xzf redis-3.0.7.tar.gz
cd redis-3.0.7
make
make test
make PREFIX=/usr/local/redis install

按默认添加自动启动
./utils/install_server.sh

手工添
mkdir -p /usr/local/redis/etc
cp redis.conf /usr/local/redis/etc/
mkdir /etc/redis​
ln -s /usr/local/redis/etc/redis.conf /etc/redis/6379.conf
cp utils/redis_init_script /etc/init.d/redis

vi /etc/init.d/redis
在头部 !/bin/sh 下一行添
chkconfig: 2345 80 30
启动的优先级80,关闭的优先级是30
保存后执行:
chkconfig --add redis
chkconfig redis on

# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
daemonize yes

 

配置一些参数配置:
#echo vm.overcommit_memory = 1 >> /etc/sysctl.conf 修改内存检测方式允许使用所有物理内存,防止高负载时性能严重下降

#echo vm.dirty_bytes=33554432 >> /etc/sysctl.conf 为了解决aof导致的busy问题,让系统从内存往硬盘刷数据的大小由默认的内存的10%减小到32M

#echo never > /sys/kernel/mm/transparent_hugepage/enabled 关闭巨透明页,centos6以上版本需要,如果改了需要重启,否则不用重启

#sysctl -p 立刻生效

 

Redis按照官方的定义是一个开源的,高级的键值存储。本文就想扩展开介绍,高级和存储两点。

为什么高级,咱们列一下它的基本特性:

  • 每秒10万+的读,8万+的写(是不是有点吹?)
  • 操作原子性(还支持把一组命令合并为一个原子操作)
  • 多种基元数据类型的支持(hash/list/(sorted)set/string)
  • 支持过期(可以用作缓存)
  • 支持主从架构(多级层次)
  • 支持管道(一次性发送多个命令执行)

以及丰富的API:

  • 提供丰富有关key的操作命令
    • 移动/排序/删除/重命名/获取类型/是否存在
    • 获得符合某个表达式的所有键
    • 设置和删除过期时间
    • 随机获得一个
  • 提供丰富的list结构操作命令
    • 获取列表长度
    • 获取列表中一段元素
    • 移除一个或若干项
    • 从最后开始移除若干项
    • 追加一个项
    • 为列表最前加入一项
    • 移除并获取第一项
    • 移除并获取最后一项
    • 按照索引号获取或设置项
    • 移除A列表最后一项追加到B列表
    • 在列表中某个项(根据值)前或后加入项
    • 获取项的索引号
  • 提供丰富的string操作命令
    • 设置(允许有过期)和追加值
    • 获取值的一段
    • 递减和递增
    • 减少和增加
    • 设置新值并且返回老的值
    • 一次获取和设置多个键的值
    • 获取值的长度
  • 提供丰富的set结构操作命令
    • 增加
    • 批量增加
    • 交集(并且保存到另外一个集合)
    • 并集(并且保存到另外一个集合)
    • 从一个移除加入另一个
    • 获取长度
    • 随机获取(并移除)
    • 是否存在
    • 获取所有
  • 提供丰富的hash结构操作命令
  • 提供丰富的sorted set操作命令
  • 支持主从(并且支持多层)
  • 提供发布订阅功能
    • ******某个信道(符合某个表达式)的消息
    • 向某个信道发布消息
  • 提供事务功能
    • 开始事务块
    • 抛弃之前的所有命令
    • 执行事务块中的所有命令
    • 监视某个键的事务执行
  • 其它
    • 授权
    • 改变数据库
    • 在线配置
    • 异步刷新数据到磁盘
    • 同步刷新数据到磁盘
    • 获取状态信息
    • 获得调试信息
    • 删除所有数据
    • 关闭
  • 支持SET if Not eXists(可以用作分布式锁)

这么多API总结起来:

  • 如此多的基于服务端的命令可以实现各种各样的逻辑(这里重点突出服务端这个词,虽然我们知道很多情况下可以get后再set,或是getall后再setall,但是这效率无法和服务端直接操作相比)
  • 很多命令都组合成了原子命令(这是很重要的,使得我们不需要在客户端做分布式锁)
  • 创新的list/set/sorted set存储结构,比单纯的key/value丰富多了

接下来说说存储这个词,很多人把它和memcached来比较,其实Redis的配置和架构是如此灵活,我们想怎么用就怎么用,想怎么调优就怎么调,可以为我们的应用来定制存储架构:

  • 设置数据刷新到磁盘的条件(几秒内几次改动)
  • 设置数据刷新到磁盘的方式(总是/每秒/不刷新)
  • 设置虚拟内存(最大内存/页大小/页数量)
  • 设置限制(内存占用/并发数/端口/是否后台)

对于用作缓存还是存储,数据的安全性等我们都可以根据不同的应用来调整。那么Redis可以用作哪些应用呢?

  • 分布式缓存
  • 分布式锁
  • 消息队列
  • 全文索引
  • 和业务逻辑绑定的存储
  • 发布订阅
  • 分布式锁
  • 博客的例子

更多信息,参阅:

 

REDIS WINDOWS 下配置

Windows版的Redis可到此处下载,非官方版
http://code.google.com/p/servicestack/wiki/RedisWindowsDownload

redis.conf
复制以下===与====之内的内容
=================================
# Redis configuration file example

# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
daemonize no

# When run as a daemon, Redis write a pid file in /var/run/redis.pid by default.
# You can specify a custom pid file location here.
pidfile /var/run/redis.pid

# Accept connections on the specified port, default is 6379
port 6379

# If you want you can bind a single interface, if the bind option is not
# specified all the interfaces will listen for connections.
#
# bind 127.0.0.1

# Close the connection after a client is idle for N seconds (0 to disable)
timeout 300

# Set server verbosity to 'debug'
# it can be one of:
# debug (a lot of information, useful for development/testing)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel debug

# Specify the log file name. Also 'stdout' can be used to force
# the demon to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null
logfile stdout

# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16

################################ SNAPSHOTTING  #################################
#
# Save the DB on disk:
#
#   save <seconds> <changes>
#
#   Will save the DB if both the given number of seconds and the given
#   number of write operations against the DB occurred.
#
#   In the example below the behaviour will be to save:
#   after 900 sec (15 min) if at least 1 key changed
#   after 300 sec (5 min) if at least 10 keys changed
#   after 60 sec if at least 10000 keys changed
save 900 1
save 300 10
save 60 10000

# Compress string objects using LZF when dump .rdb databases?
# For default that's set to 'yes' as it's almost always a win.
# If you want to save some CPU in the saving child set it to 'no' but
# the dataset will likely be bigger if you have compressible values or keys.
rdbcompression yes

# The filename where to dump the DB
dbfilename dump.rdb

# For default save/load DB in/from the working directory
# Note that you must specify a directory not a file name.
dir ./

################################# REPLICATION #################################

# Master-Slave replication. Use slaveof to make a Redis instance a copy of
# another Redis server. Note that the configuration is local to the slave
# so for example it is possible to configure the slave to save the DB with a
# different interval, or to listen to another port, and so on.
#
# slaveof <masterip> <masterport>

# If the master is password protected (using the "requirepass" configuration
# directive below) it is possible to tell the slave to authenticate before
# starting the replication synchronization process, otherwise the master will
# refuse the slave request.
#
# masterauth <master-password>

################################## SECURITY ###################################

# Require clients to issue AUTH <PASSWORD> before processing any other
# commands.  This might be useful in environments in which you do not trust
# others with access to the host running redis-server.
#
# This should stay commented out for backward compatibility and because most
# people do not need auth (e.g. they run their own servers).
#
# requirepass foobared

################################### LIMITS ####################################

# Set the max number of connected clients at the same time. By default there
# is no limit, and it's up to the number of file descriptors the Redis process
# is able to open. The special value '0' means no limts.
# Once the limit is reached Redis will close all the new connections sending
# an error 'max number of clients reached'.
#
# maxclients 128

# Don't use more memory than the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys with an
# EXPIRE set. It will try to start freeing keys that are going to expire
# in little time and preserve keys with a longer time to live.
# Redis will also try to remove objects from free lists if possible.
#
# If all this fails, Redis will start to reply with errors to commands
# that will use more memory, like SET, LPUSH, and so on, and will continue
# to reply to most read-only commands like GET.
#
# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a
# 'state' server or cache, not as a real DB. When Redis is used as a real
# database the memory usage will grow over the weeks, it will be obvious if
# it is going to use too much memory in the long run, and you'll have the time
# to upgrade. With maxmemory after the limit is reached you'll start to get
# errors for write operations, and this may even lead to DB inconsistency.
#
# maxmemory <bytes>

############################## APPEND ONLY MODE ###############################

# By default Redis asynchronously dumps the dataset on disk. If you can live
# with the idea that the latest records will be lost if something like a crash
# happens this is the preferred way to run Redis. If instead you care a lot
# about your data and don't want to that a single record can get lost you should
# enable the append only mode: when this mode is enabled Redis will append
# every write operation received in the file appendonly.log. This file will
# be read on startup in order to rebuild the full dataset in memory.
#
# Note that you can have both the async dumps and the append only file if you
# like (you have to comment the "save" statements above to disable the dumps).
# Still if append only mode is enabled Redis will load the data from the
# log file at startup ignoring the dump.rdb file.
#
# The name of the append only file is "appendonly.log"
#
# IMPORTANT: Check the BGREWRITEAOF to check how to rewrite the append
# log file in background when it gets too big.

appendonly no

# The fsync() call tells the Operating System to actually write data on disk
# instead to wait for more data in the output buffer. Some OS will really flush
# data on disk, some other OS will just try to do it ASAP.
#
# Redis supports three different modes:
#
# no: don't fsync, just let the OS flush the data when it wants. Faster.
# always: fsync after every write to the append only log . Slow, Safest.
# everysec: fsync only if one second passed since the last fsync. Compromise.
#
# The default is "always" that's the safer of the options. It's up to you to
# understand if you can relax this to "everysec" that will fsync every second
# or to "no" that will let the operating system flush the output buffer when
# it want, for better performances (but if you can live with the idea of
# some data loss consider the default persistence mode that's snapshotting).

appendfsync always
# appendfsync everysec
# appendfsync no

############################### ADVANCED CONFIG ###############################

# Glue small output buffers together in order to send small replies in a
# single TCP packet. Uses a bit more CPU but most of the times it is a win
# in terms of number of queries per second. Use 'yes' if unsure.
glueoutputbuf yes

# Use object sharing. Can save a lot of memory if you have many common
# string in your dataset, but performs lookups against the shared objects
# pool so it uses more CPU and can be a bit slower. Usually it's a good
# idea.
#
# When object sharing is enabled (shareobjects yes) you can use
# shareobjectspoolsize to control the size of the pool used in order to try
# object sharing. A bigger pool size will lead to better sharing capabilities.
# In general you want this value to be at least the double of the number of
# very common strings you have in your dataset.
#
# WARNING: object sharing is experimental, don't enable this feature
# in production before of Redis 1.0-stable. Still please try this feature in
# your development environment so that we can test it better.
# shareobjects no
# shareobjectspoolsize 1024

================================================
将以上内容存储为redis.conf

解压下载的redis包。我的windows下的解压地址是D:\redis-2.0.2
将redis.conf 拷贝到D:\redis-2.0.2\下

指定redis的配置文件,如没有指定,则使用默认设置
D:\redis-2.0.2>redis-server.exe redis.conf

如下图则开启正常
点击查看原图

新开一个cmd命令窗口
redis-cli.exe:命令行客户端,测试用
D:\redis-2.0.2>redis-cli.exe -h 192.168.10.59 -p 6379
192.168.10.59 是我本地的地址

如下图则

点击查看原图

玩redis吧~~
点击查看原图

------------------------------------------------------------------------------------------------

另一摘要

------------------------------------------------------------------------------------------------

window平台Redis安装

下载地址: http://code.google.com/p/servicestack/wiki/RedisWindowsDownload

Redis文件夹有以下几个文件

redis-server.exe:服务程序

redis-check-dump.exe:本地数据库检查

redis-check-aof.exe:更新日志检查

redis-benchmark.exe:性能测试,用以模拟同时由N个客户端发送M个 SETs/GETs 查询 (类似于 Apache 的ab 工具).

指定redis的配置文件,如没有指定,则使用默认设置

解压目录:

d:\>redis-server.exe

redis-cli.exe:命令行客户端,测试用.windows下没有redis.conf配置文件.  

解压目录:
d:\>redis-cli.exe -h 127.0.0.1 -p 6379

使用方法有两种:一种是直接使用redis-cli.exe 后面加操作,另一种是直接输入redis-cli.exe进入管理界面,然后直接在redis提示符下输入命令即可.

设置一个Key并获取返回的值:

$ ./redis-cli set mykey somevalue

OK

$ ./redis-cli get mykey

Somevalue

如何添加值到list:

$ ./redis-cli lpush mylist firstvalue

OK

$ ./redis-cli lpush mylist secondvalue

OK

$ ./redis-cli lpush mylist thirdvalue

OK

$ ./redis-cli lrange mylist 0 -1

1. thirdvalue

2. secondvalue

3. firstvalue

$ ./redis-cli rpop mylist

firstvalue

$ ./redis-cli lrange mylist 0 -1

1. thirdvalue

2. secondvalue

redis-benchmark.exe:性能测试,用以模拟同时由N个客户端发送M个 SETs/GETs 查询 (类似于 Apache 的 ab 工具).

./redis-benchmark -n 100000 –c 50

====== SET ======

100007 requests completed in 0.88 seconds (译者注:100004 查询完成于 1.14 秒 )

50 parallel clients (译者注:50个并发客户端)

3 bytes payload (译者注:3字节有效载荷)

keep alive: 1 (译者注:保持1个连接)

58.50% <= 0 milliseconds(译者注:毫秒)

99.17% <= 1 milliseconds

99.58% <= 2 milliseconds

99.85% <= 3 milliseconds

99.90% <= 6 milliseconds

100.00% <= 9 milliseconds

114293.71 requests per second(译者注:每秒 114293.71 次查询)

......

Windows下测试并发客户端极限为60

 

linux平台Redis安装:

wget http://code.google.com/p/redis/downloads/detail?name=redis-2.0.4.tar.gz
tar xvzf redis-2.0.4.tar.gz
cd  redis-2.0.4
make
mkdir /home/redis
cp redis-server  /home/redis
cp redis-benchmark  /home/redis
cp redis-cli  /home/redis
cp redis.conf  /home/redis
cd  /home/redis

启动

./redis-server redis.conf

进入命令交互模式,两种:

1:   ./redis-cli

2:   telnet 127.0.0.1 6379       (ip接端口)

在安装的时候要注意系统时间要正确,不然可能会提示"make[1]: warning: Clock skew detected. Your build may be incomplete"错误!

=============================================================

配置文件参数说明:

 

1. Redis默认不是以守护进程的方式运行,可以通过该配置项修改,使用yes启用守护进程

daemonize no

2. 当Redis以守护进程方式运行时,Redis默认会把pid写入/var/run/redis.pid文件,可以通过pidfile指定

pidfile /var/run/redis.pid

3. 指定Redis监听端口,默认端口为6379,作者在自己的一篇博文中解释了为什么选用6379作为默认端口,因为6379在手机按键上MERZ对应的号码,而MERZ取自意大利歌女Alessia Merz的名字

port 6379

4. 绑定的主机地址

bind 127.0.0.1

5.当 客户端闲置多长时间后关闭连接,如果指定为0,表示关闭该功能

timeout 300

6. 指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为verbose

loglevel verbose

7. 日志记录方式,默认为标准输出,如果配置Redis为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给/dev/null

logfile stdout

8. 设置数据库的数量,默认数据库为0,可以使用SELECT <dbid>命令在连接上指定数据库id

databases 16

9. 指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合

save <seconds> <changes>

Redis默认配置文件中提供了三个条件:

save 900 1

save 300 10

save 60 10000

分别表示900秒(15分钟)内有1个更改,300秒(5分钟)内有10个更改以及60秒内有10000个更改。

10. 指定存储至本地数据库时是否压缩数据,默认为yes,Redis采用LZF压缩,如果为了节省CPU时间,可以关闭该选项,但会导致数据库文件变的巨大

rdbcompression yes

11. 指定本地数据库文件名,默认值为dump.rdb

dbfilename dump.rdb

12. 指定本地数据库存放目录

dir ./

13. 设置当本机为slav服务时,设置master服务的IP地址及端口,在Redis启动时,它会自动从master进行数据同步

slaveof <masterip> <masterport>

14. 当master服务设置了密码保护时,slav服务连接master的密码

masterauth <master-password>

15. 设置Redis连接密码,如果配置了连接密码,客户端在连接Redis时需要通过AUTH <password>命令提供密码,默认关闭

requirepass foobared

16. 设置同一时间最大客户端连接数,默认无限制,Redis可以同时打开的客户端连接数为Redis进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis会关闭新的连接并向客户端返回max number of clients reached错误信息

maxclients 128

17. 指定Redis最大内存限制,Redis在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝试清除已到期或即将到期的Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放在swap区

maxmemory <bytes>

18. 指定是否在每次更新操作后进行日志记录,Redis在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis本身同步数据文件是按上面save条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为no

appendonly no

19. 指定更新日志文件名,默认为appendonly.aof

appendfilename appendonly.aof

20. 指定更新日志条件,共有3个可选值:

no:表示等操作系统进行数据缓存同步到磁盘(快)

always:表示每次更新操作后手动调用fsync()将数据写到磁盘(慢,安全)

everysec:表示每秒同步一次(折衷,默认值)

appendfsync everysec

21. 指定是否启用虚拟内存机制,默认值为no,简单的介绍一下,VM机制将数据分页存放,由Redis将访问量较少的页即冷数据swap到磁盘上,访问多的页面由磁盘自动换出到内存中(在后面的文章我会仔细分析Redis的VM机制)

vm-enabled no

22. 虚拟内存文件路径,默认值为/tmp/redis.swap,不可多个Redis实例共享

vm-swap-file /tmp/redis.swap

23. 将所有大于vm-max-memory的数据存入虚拟内存,无论vm-max-memory设置多小,所有索引数据都是内存存储的(Redis的索引数据 就是keys),也就是说,当vm-max-memory设置为0的时候,其实是所有value都存在于磁盘。默认值为0

vm-max-memory 0

24. Redis swap文件分成了很多的page,一个对象可以保存在多个page上面,但一个page上不能被多个对象共享,vm-page-size是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page大小最好设置为32或者64bytes;如果存储很大大对象,则可以使用更大的page,如果不 确定,就使用默认值

vm-page-size 32

25. 设置swap文件中的page数量,由于页表(一种表示页面空闲或使用的bitmap)是在放在内存中的,,在磁盘上每8个pages将消耗1byte的内存。

vm-pages 134217728

26. 设置访问swap文件的线程数,最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都是串行的,可能会造成比较长时间的延迟。默认值为4

vm-max-threads 4

27. 设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启

glueoutputbuf yes

28. 指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法

hash-max-zipmap-entries 64

hash-max-zipmap-value 512

29. 指定是否激活重置哈希,默认为开启(后面在介绍Redis的哈希算法时具体介绍)

activerehashing yes

30. 指定包含其它的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各个实例又拥有自己的特定配置文件

include /path/to/local.conf

=============================================================

 

问题讨论:
1. Redis官方文档对VM的使用提出了一些建议:
当你的key很小而value很大时,使用VM的效果会比较好.因为这样节约的内存比较大.
当你的key不小时,可以考虑使用一些非常方法将很大的key变成很大的value,比如你可以考虑将key,value组合成一个新的value.
最好使用linux ext3 等对稀疏文件支持比较好的文件系统保存你的swap文件.
vm-max-threads这个参数,可以设置访问swap文件的线程数,设置最好不要超过机器的核数.如果设置为0,那么所有对swap文件的操作都是串行的.可能会造成比较长时间的延迟,但是对数据完整性有很好的保证.
2. 关于Redis新的存储模式diskstore(http://timyang.net/data/redis-diskstore),
节选:
适合Web 2.0数据访问最佳的方式就是完全基于内存,比如用Memcached或者Redis snapshot方式。但是更多的业务场景是数据规模会超过RAM容量,因此有几种不同的设计模式。

VM方式: 将数据分页存放,由应用(如Redis)或者操作系统(如Varnish)将访问量较少的页即冷数据swap到磁盘上,访问多的页面由磁盘自动换出到内存中。应用实现VM缺点是代码逻辑复杂,如果业务上冷热数据边界并不分明,则换入换出代价太高,系统整体性能低。不少抢鲜的网友在微博上也反馈过使用VM种种不稳定情况。操作系统实现缺点在于主要OS的VM换入换出是基于Page概念,比如OS VM1个Page是4K, 4K中只要还有一个元素即使只有1个字节被访问,这个页也不会被SWAP,换入也同样道理,读到一个字节可能会换入4K无用的内存。而Redis自己实现则可以达到控制换入的粒度。另外访问操作系统SWAP内存区域时block进程,也是导致Redis要自己实现VM原因之一。   磁盘方式: 所有的数据读写访问都是基于磁盘,由操作系统来只能的缓存访问的数据。由于现代操作系统都非常聪明,会将频繁访问的数据加入到内存中,因此应用并不需要过多特殊逻辑。MongoDB就是这种设计方式。这种方式也有一些已知的缺点,比如操作MMap写入磁盘由操作系统控制,操作系统先写哪里后写哪里应用并不知情,如果写入过程中发生了crash则数据一致性会存在问题。这个也是MongoDB饱受争议的单机Durability问题 

硬盘存储+cache方式: 实际原理和mysql+memcache方式类似,只不过将两者功能合二为一到一个底层服务中,简化了调用。

在上面几种方式中,除去VM,antirez觉得MongoDB方式也不太适合,因此选择了disktore方式来实现新的磁盘存储,具体细节是
1) 读操作,使用read through以及LRU方式。内存中不存在的数据从磁盘拉取并放入内存,内存中放不下的数据采用LRU淘汰。

 

2) 写操作,采用另外spawn一个线程单独处理,写线程通常是异步的,当然也可以把cache-flush-delay配置设成0,Redis尽量保证即时写入。但是在很多场合延迟写会有更好的性能,比如一些计数器用Redis存储,在短时间如果某个计数反复被修改,Redis只需要将最终的结果写入磁盘。这种做法作者叫per key persistence。由于写入会按key合并,因此和snapshot还是有差异,disk store并不能保证时间一致性。由于写操作是单线程,即使cache-flush-delay设成0,多个client同时写则需要排队等待,如果队列容量超过cache-max-memory,Redis设计会进入等待状态,造成调用方卡住。Google Group上有热心网友迅速完成了压力测试,当内存用完之后,set每秒处理速度从25k下降到10k再到后来几乎卡住。虽然通过增加cache-flush-delay可以提高相同key重复写入性能;通过增加cache-max-memory可以应对临时峰值写入。但是diskstore写入瓶颈最终还是在IO。
3) rdb 和新 diskstore 格式关系
rdb是传统Redis内存方式的存储格式,diskstore是另外一种格式,那两者关系如何?1.通过BGSAVE可以随时将diskstore格式另存为rdb格式,而且rdb格式还用于Redis复制以及不同存储方式之间的中间格式。 

2.通过工具可以将rdb格式转换成diskstore格式。

.NET 客户端连接Redis 介绍

C#写的客户端类型主要有:

ServiceStack.Redis ★  https://github.com/ServiceStack/ServiceStack.Redis
Booksleeve ★   http://code.google.com/p/booksleeve/
Sider  http://nuget.org/List/Packages/Sider
TeamDev Redis Client  http://redis.codeplex.com/
redis-sharp   https://github.com/migueldeicaza/redis-sharp

加星号的是使用率高的项目。参考:http://redis.io/clients

ServiceStack.Redis为例

引入到项目中,并添加如下名空间引用(仅限本文):

using ServiceStack.Common.Extensions;
using ServiceStack.Redis;
using ServiceStack.Redis.Generic;
using ServiceStack.Text;
using ServiceStack.Redis.Support;

注:ServiceStackRedis封装了大量方法和对象,这里只摘有代表性的内容介绍,更多内容参见其官方文档。

声明一个客户端对象:
protected RedisClient Redis = new RedisClient("127.0.0.1", 6379);//redis服务IP和端口

一 .基本KEY/VALUE键值对操作:
1. 添加/获取:

List<string> storeMembers =new List<string>();
storeMembers.ForEach(x
=> Redis.AddItemToList("additemtolist", x));

注:也可直接使用AddRangeToList方法将一组数据装入如:

Redis.AddRangeToList("addarrangetolist", storeMembers);

2. 获取数据

var members = Redis.GetAllItemsFromList("additemtolist");
members.ForEach(s
=> Response.Write("<br/>additemtolist :"+ s));

3. 获取指定索引位置数据

var item = Redis.GetItemFromList("addarrangetolist", 2);

4. 移除:

var list = Redis.Lists["addarrangetolist"];
list.Clear();
//清空
list.Remove("two");//移除指定键值
list.RemoveAt(2);//移除指定索引位置数据

二.存储对象:

publicclass UserInfo
{
publiclong Id { set; get; }
publicstring UserName { get; set; }
publicint Age { get; set; }
}

1.通常方式(底层使用json序列化):

Redis.Set<UserInfo>("userinfo", new UserInfo() { UserName ="李四", Age =45 });
UserInfo userinfo
= Redis.Get<UserInfo>("userinfo");

注:当然上面方式也适合于基本类型,如:

Redis.Set<int>("my_age", 12);//或Redis.Set("my_age", 12);
int age = Redis.Get<int>("my_age");

2.object序列化方式存储:

var ser =new ObjectSerializer(); //位于namespace ServiceStack.Redis.Support;
bool result = Redis.Set<byte[]>("userinfo", ser.Serialize(new UserInfo() { UserName ="张三", Age =12 }));
UserInfo userinfo
= ser.Deserialize(Redis.Get<byte[]>("userinfo")) as UserInfo;
//也支持列表
Redis.Set<byte[]>("userinfolist_serialize", ser.Serialize(userinfoList));
List
<UserInfo> userList = ser.Deserialize(Redis.Get<byte[]>("userinfolist_serialize")) as List<UserInfo>;

需要说明的是在测试过程中发现JSON序列化的效率要比object序列化高一些。

三.存储表格对象,比如:

using (var redisUsers = Redis.GetTypedClient<UserInfo>())
{
redisUsers.Store(
new UserInfo { Id = redisUsers.GetNextSequence(), UserName ="daizhj", Age =12 });
redisUsers.Store(
new UserInfo { Id = redisUsers.GetNextSequence(), UserName ="daizhenjun", Age =13 });

var allUsers = redisUsers.GetAll();//就像操作ado对象一样,可以进行CRUD等操作
allUsers.ForEach(s => Response.Write("<br/>user :"+ s.UserName +" age:"+ s.Age));
}

四.使用客户端链接池模式提升链接速度:

publicstatic PooledRedisClientManager CreateManager(string[] readWriteHosts, string[] readOnlyHosts)
{
//支持读写分离,均衡负载
returnnew PooledRedisClientManager(readWriteHosts, readOnlyHosts, new RedisClientManagerConfig
{
MaxWritePoolSize
=5,//“写”链接池链接数
MaxReadPoolSize =5,//“写”链接池链接数
AutoStart =true,
});
}

声明链接池对象(这里只使用一个redis服务端):

PooledRedisClientManager prcm = CreateManager(newstring[] { "127.0.0.1:6379" }, newstring[] { "127.0.0.1:6379" });

List<UserInfo> userinfoList =new List<UserInfo>();
userinfoList.Add(
new UserInfo() { UserName ="pool_daizhj", Age =1 });
userinfoList.Add(
new UserInfo() { UserName ="pool_daizhj1", Age =2 });

从池中获取一个链接:

using (IRedisClient Redis = prcm.GetClient())
{
Redis.Set(
"userinfolist", userinfoList);
List
<UserInfo> userList = Redis.Get<List<UserInfo>>("userinfolist");
}

注:
1.前三种方式我在本地测试发现存取效率从高到底,具体原因还待分析。

2.如只想使用长链接而不是链接池的话,可以直接将下面对象用static方式声明即可:
protected static RedisClient Redis = new RedisClient("127.0.0.1", 6379);

这样在redis服务端显示只有一个客户链接

3.与memcached测试过程中发现,在存储时两者效率接近(使用本文第一种方式),在取数据时memcached速度比redis要快一些(毫秒级差异),这一点并不像网上一些文章所说的那样,看来在实际开发和生产环境下还要以使用背景及结果为准。

Redis指令文档(参考)

Redis指令文档

连接控制
QUIT 关闭连接
AUTH (仅限启用时)简单的密码验证

适合全体类型的命令
EXISTS key 判断一个键是否存在;存在返回 1;否则返回0;
DEL key 删除某个key,或是一系列key;DEL key1 key2 key3 key4
TYPE key 返回某个key元素的数据类型 ( none:不存在,string:字符,list,set,zset,hash)
KEYS pattern 返回匹配的key列表 (KEYS foo*:查找foo开头的keys)
RANDOMKEY 随机获得一个已经存在的key,如果当前数据库为空,则返回空字符串
RENAME oldname newname更改key的名字,新键如果存在将被覆盖
RENAMENX oldname newname 更改key的名字,如果名字存在则更改失败
DBSIZE返回当前数据库的key的总数
EXPIRE设置某个key的过期时间(秒),(EXPIRE bruce 1000:设置bruce这个key1000秒后系统自动删除)注意:如果在还没有过期的时候,对值进行了改变,那么那个值会被清除。
TTL查找某个key还有多长时间过期,返回时间秒
SELECT index 选择数据库
MOVE key dbindex 将指定键从当前数据库移到目标数据库 dbindex。成功返回 1;否则返回0(源数据库不存在key或目标数据库已存在同名key);
FLUSHDB 清空当前数据库中的所有键
FLUSHALL 清空所有数据库中的所有键

处理字符串的命令
SET key value 给一个键设置字符串值。SET keyname datalength data (SET bruce 10 paitoubing:保存key为burce,字符串长度为10的一个字符串paitoubing到数据库),data最大不可超过1G。
GET key获取某个key 的value值。如key不存在,则返回字符串“nil”;如key的值不为字符串类型,则返回一个错误。
GETSET key value可以理解成获得的key的值然后SET这个值,更加方便的操作 (SET bruce 10 paitoubing,这个时候需要修改bruce变成1234567890并获取这个以前的数据paitoubing,GETSET bruce 10 1234567890)
MGET key1 key2 … keyN 一次性返回多个键的值
SETNX key value SETNX与SET的区别是SET可以创建与更新key的value,而SETNX是如果key不存在,则创建key与value数据
MSET key1 value1 key2 value2 … keyN valueN 在一次原子操作下一次性设置多个键和值
MSETNX key1 value1 key2 value2 … keyN valueN 在一次原子操作下一次性设置多个键和值(目标键不存在情况下,如果有一个以上的key已存在,则失败)
INCR key 自增键值
INCRBY key integer 令键值自增指定数值
DECR key 自减键值
DECRBY key integer 令键值自减指定数值

处理 lists 的命令
RPUSH key value 从 List 尾部添加一个元素(如序列不存在,则先创建,如已存在同名Key而非序列,则返回错误)
LPUSH key value 从 List 头部添加一个元素
LLEN key 返回一个 List 的长度
LRANGE key start end从自定的范围内返回序列的元素 (LRANGE testlist 0 2;返回序列testlist前0 1 2元素)
LTRIM key start end修剪某个范围之外的数据 (LTRIM testlist 0 2;保留0 1 2元素,其余的删除)
LINDEX key index返回某个位置的序列值(LINDEX testlist 0;返回序列testlist位置为0的元素)
LSET key index value更新某个位置元素的值
LREM key count value 从 List 的头部(count正数)或尾部(count负数)删除一定数量(count)匹配value的元素,返回删除的元素数量。
LPOP key 弹出 List 的第一个元素
RPOP key 弹出 List 的最后一个元素
RPOPLPUSH srckey dstkey 弹出 _srckey_ 中最后一个元素并将其压入 _dstkey_头部,key不存在或序列为空则返回“nil”

处理集合(sets)的命令(有索引无序序列)
SADD key member增加元素到SETS序列,如果元素(membe)不存在则添加成功 1,否则失败 0;(SADD testlist 3 n one)
SREM key member 删除SETS序列的某个元素,如果元素不存在则失败0,否则成功 1(SREM testlist 3 N one)
SPOP key 从集合中随机弹出一个成员
SMOVE srckey dstkey member 把一个SETS序列的某个元素 移动到 另外一个SETS序列 (SMOVE testlist test 3n two;从序列testlist移动元素two到 test中,testlist中将不存在two元素)
SCARD key 统计某个SETS的序列的元素数量
SISMEMBER key member 获知指定成员是否存在于集合中
SINTER key1 key2 … keyN 返回 key1, key2, …, keyN 中的交集
SINTERSTORE dstkey key1 key2 … keyN 将 key1, key2, …, keyN 中的交集存入 dstkey
SUNION key1 key2 … keyN 返回 key1, key2, …, keyN 的并集
SUNIONSTORE dstkey key1 key2 … keyN 将 key1, key2, …, keyN 的并集存入 dstkey
SDIFF key1 key2 … keyN 依据 key2, …, keyN 求 key1 的差集。官方例子:
key1 = x,a,b,c
key2 = c
key3 = a,d
SDIFF key1,key2,key3 => x,b
SDIFFSTORE dstkey key1 key2 … keyN 依据 key2, …, keyN 求 key1 的差集并存入 dstkey
SMEMBERS key 返回某个序列的所有元素
SRANDMEMBER key 随机返回某个序列的元素

处理有序集合(sorted sets)的命令 (zsets)
ZADD key score member 添加指定成员到有序集合中,如果目标存在则更新score(分值,排序用)
ZREM key member 从有序集合删除指定成员
ZINCRBY key increment member 如果成员存在则将其增加_increment_,否则将设置一个score为_increment_的成员
ZRANGE key start end 返回升序排序后的指定范围的成员
ZREVRANGE key start end 返回降序排序后的指定范围的成员
ZRANGEBYSCORE key min max 返回所有符合score >= min和score <= max的成员 ZCARD key 返回有序集合的元素数量 ZSCORE key element 返回指定成员的SCORE值 ZREMRANGEBYSCORE key min max 删除符合 score >= min 和 score <= max 条件的所有成员

排序(List, Set, Sorted Set)
SORT key BY pattern LIMIT start end GET pattern ASC|DESC ALPHA 按照指定模式排序集合或List

SORT mylist
默认升序 ASC

SORT mylist DESC

SORT mylist LIMIT 0 10
从序号0开始,取10条

SORT mylist LIMIT 0 10 ALPHA DESC
按首字符排序

SORT mylist BY weight_*
SORT mylist BY weight_* GET object_*
SORT mylist BY weight_* GET object_* GET #

SORT mylist BY weight_* STORE resultkey
将返回的结果存放于resultkey序列(List)

持久控制
SAVE 同步保存数据到磁盘
BGSAVE 异步保存数据到磁盘
LASTSAVE 返回上次成功保存到磁盘的Unix时间戳
SHUTDOWN 同步保存到服务器并关闭 Redis 服务器(SAVE+QUIT)
BGREWRITEAOF 当日志文件过长时重写日志文件

远程控制命令
INFO 提供服务器的信息和统计信息
MONITOR 实时输出所有收到的请求
SLAVEOF 修改复制选项

redis目前提供四种数据类型:string,list,set及zset(sorted set)。
* string是最简单的类型,你可以理解成与Memcached一模一个的类型,一个key对应一个value,其上支持的操作与Memcached的操作类似。但它的功能更丰富。
* list是一个链表结构,主要功能是push、pop、获取一个范围的所有值等等。操作中key理解为链表的名字。
* set是集合,和我们数学中的集合概念相似,对集合的操作有添加删除元素,有对多个集合求交并差等操作。操作中key理解为集合的名字。
* zset是set的一个升级版本,他在set的基础上增加了一个顺序属性,这一属性在添加修改元素的时候可以指定,每次指定后,zset会自动重新按新的值调整顺序。可以理解了有两列的mysql表,一列存value,一列存顺序。操作中key理解为zset的名字。

协议
redis目前只有基于TCP的文本协议,与memcache类似,有一些改进。
客户端通常发送
命令 参数… 值字节数rn
值rn

服务端的返回,根据第一个字节,可以判断:
- 错误信息
+ 普通文本信息
$ 变长字节数,$6表示CRLF之后有6个字节的字符
: 返回一个整数
* 返回组数,即*6表示CRLF之后将返回6组变长字符

注意事项:
Key不可包含空格或者回车符
Key不要过长或过短,应使其有意义,如”comment:1234:reply.to”

Redis使用系列:协议篇

中文命令参考:

   http://redis.readthedocs.org/en/latest/

 

---------------------------------------------------------------------------

Redis从1.2版本开始,设计了一套统一的协议格式,作者讲到自己设计的协议在下面几个方面进行了权衡:
1. 实现简单
2. 快速通过计算机解析
3. 容易让人阅读

如果我们需要自己实现一个Redis客户端程序,有必要了解一下Redis的协议格式。在网络层面,客户端通过TCP连接到Redis服务器(默认端口6379,可以通过配置文件修改),客户端与服务器之间发送的命令以\r\n(CR LF)结尾。

请求协议

Redis请求参数的通用格式如下:

*<参数数量> CR LF  $<第1个参数字节数> CR LF  <参数数据> CR LF  ...  $<第N个参数字节数> CR LF  <参数数据> CR LF

举个例子,要使用SET命令在Redis中存储一条key=mykey,value=myvalue的数据,则客户端发送给Redis的服务器协议如下:

*3  $3  SET  $5  mykey  $7  myvalue

最终发给Redis服务器的二进制数据用字符串表示是:

*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n

应答协议

Redis的应答命令分为不同的种类,每种应答使用不同的表示方式,下面逐一说明。

1. 单行应答:使用“+”开始,后面跟应答字符串,以\r\n结尾,客户端实现给应用程序返回“+”后面的内容,如:

+OK\r\n

2. 错误应答:与单行应答类似,只不过以“-”开始,如:

-(error) ERR unknown command 'INC'\r\n

3. 整数应答:使用“:”开始,后面跟应答内容(表示整数的字符串),以\r\n结尾,如:

:1000\r\n

4. Bulk应答:如使用GET命令获取一个字符串,服务器会使用Bulk应答,使用“$”开始,后面跟应答数据字节数(+\r\n),再加上应答数据,最后以\r\n结尾,如:

$7\r\nmyvalue\r\n

如果没有获取到结果(如请求的Key不存在),服务器将会应答-1,如:

$-1

5. 批量应答:有些命令如LRANGE等,需要返回多个应答值,此时Redis采用与请求命令相同的协议格式发送应答:

*4  $3  foo  $3  bar  $5  hello  $5  world

客户端接收到的二进制数据用字符串表示:

*4\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nhello\r\n$5\r\nworld\r\n

如果没有获取到结果,服务器会应答-1,如:

*-1

其他说明

1. Redis支持Pipelining把多个命令打包在一起发送以减少RTT,详细信息请参考官网这篇文章

2. 在Redis 2.2版本中提供了基于C的客户端hiredis(以前也提供,但是2.2版本进行了大规模的重构),在实现一个客户端时是一个很好的参考。

Redis使用系列通过三篇文章:配置文件分析、功能示例和协议,为大家介绍了Redis的使用。接下来是时候去看看Redis的源码,并通过源码分析内部实现,如VM机制、字符串实现等,Redis源码分析系列见。