分布式锁应该具备的条件:
- 在分布式系统环境下,任意时刻,只能有一个客户端能持有锁;
- 高可用/高性能的获取锁和释放锁,必须保证加锁和解锁是同一个客户端所为;
- 具备锁失效机制(防止死锁),即使有一个客户端在持有锁期间崩溃而没有主动释放锁,也能保证后续其他客户端能正常加锁;
- 具备非堵塞特性,即没有获取到锁,直接返回获取锁失败。
示例代码:
class RedisLock:
"""
分布式锁
"""
def __init__(self, redis_conn, Django=False):
"""
@param redis_conn: redis 实例对象
@param Django: 是否是django,如果是django框架就使用django自带缓存
@type Django: bool
"""
self.redis_conn = redis_conn
self.ip = socket.gethostbyname(socket.gethostname())
self.pid = os.getpid()
self.sentinel = object()
self.django = Django
@staticmethod
def get_lock_key(key: str) -> str:
"""
格式化锁名称
@param key: 名称
@type key: str
@return: key 名称
@rtype: str
"""
lock_key = f'lock_{key}'
return lock_key
def gen_unique_value(self) -> str:
"""
获取锁value使锁有唯一性,防止其它线程误删
@return:
@rtype:
"""
thread_name = threading.current_thread().name
time_now = time.time()
unique_value = f'{self.ip}-{self.pid}-{thread_name}-{time_now}'
return unique_value
def get_lock(self, key, timeout: int = 100) -> str:
"""
获取锁
@param key: 锁名
@type key: str
@param timeout: 锁过期时间,防止死锁
@type timeout: int
@return:
@rtype: str
"""
lock_key = self.get_lock_key(key)
unique_value = self.gen_unique_value()
while True:
if self.django:
judge = self.redis_conn.add(lock_key, unique_value, timeout)
else:
judge = self.redis_conn.set(lock_key, unique_value, nx=True, ex=timeout)
if judge:
return unique_value
time.sleep(0.001)
def del_lock(self, key, value):
# 释放锁
lock_key = self.get_lock_key(key)
old_lock_value = self.redis_conn.get(lock_key)
if old_lock_value == value:
return self.redis_conn.delete(lock_key)
加锁过程
- 首先需要为锁生成一个唯一标识value;
- 然后使用redis set 命令设置锁,从 v2.6.12 版本开始,set命令支持
nx
和ex
参数,具体内容可点击进行查看;如果锁之前不存在,则加锁成功,并设置锁的过期时间,返回锁唯一标识;
解锁过程
- 查询锁对应的标识是否与本次解锁的标识相同;
- 如果标识相同,则在事务中删除锁。
下面对刚才实现的分布式锁进行测试,使用50个线程,模拟秒杀10张票,从结果的有序性可以看出是否为加锁状态,代码如下:
from threading import Thread
import redis
count = 10
def ticket(i, lock, key):
lock_value = lock.get_lock(key, 10)
print(f"线程{i}--获得了锁")
time.sleep(1)
global count
if count < 1:
print(f"线程{i}没抢到票, 票已经抢完了")
return
count -= 1
print(f"线程{i}抢到票了, 还剩{count}张票")
lock.del_lock(key, lock_value)
print(f"线程{i}--释放了锁")
if __name__ == '__main__':
# Redis 存字符串返回的是byte,指定decode_responses=True可以解决
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, socket_connect_timeout=3, decode_responses=True)
_redis = redis.Redis(connection_pool=pool)
lock = RedisLock(_redis)
for i in range(10):
t = Thread(target=ticket, args=(i, lock, 'test_key'))
t.start()
输出如下:
线程1--获得了锁
线程1抢到票了, 还剩4张票
线程1--释放了锁
线程2--获得了锁
线程4--获得了锁
线程3--获得了锁
线程0--获得了锁
线程6--获得了锁
线程7--获得了锁
线程8--获得了锁
线程5--获得了锁
线程9--获得了锁
线程11--获得了锁
线程10--获得了锁
线程13--获得了锁
线程2抢到票了, 还剩3张票
线程2--释放了锁
线程14--获得了锁
线程12--获得了锁
线程15--获得了锁
线程16--获得了锁
线程17--获得了锁
线程19--获得了锁
线程18--获得了锁
线程4抢到票了, 还剩2张票
线程4--释放了锁
线程3抢到票了, 还剩1张票
线程0抢到票了, 还剩0张票
线程8没抢到票, 票已经抢完了
线程6没抢到票, 票已经抢完了
线程7没抢到票, 票已经抢完了
线程3--释放了锁
线程5没抢到票, 票已经抢完了
线程0--释放了锁
线程9没抢到票, 票已经抢完了
线程11没抢到票, 票已经抢完了
线程10没抢到票, 票已经抢完了
线程13没抢到票, 票已经抢完了
线程14没抢到票, 票已经抢完了
线程12没抢到票, 票已经抢完了
线程15没抢到票, 票已经抢完了
线程16没抢到票, 票已经抢完了
线程17没抢到票, 票已经抢完了
线程19没抢到票, 票已经抢完了
线程18没抢到票, 票已经抢完了
装饰器
def redis_lock(key: str, timeout: int = 10, redis_conn: object = cache, Django=Ture):
"""
@param key: 锁名称
@type key: str
@param timeout: 锁过期时间
@type timeout: int
@param redis_conn: Redis实例
@type Django bool
@param Django 是否是django框架缓存 默认使用redis缓存
"""
lock = RedisLock(redis_conn, Django=Django)
def run_lock(func):
def wrapper(*args, **kwargs):
thread_name = threading.current_thread().name
logger.info(f"{thread_name} 线程, {key} key,正在获取锁")
lock_value = lock.get_lock(key, timeout)
logger.info(f"{thread_name} 线程, {key} key,已获取锁")
res = func(*args, **kwargs)
lock.del_lock(key, lock_value)
logger.info(f'{thread_name} 线程, {key} key,释放锁')
return res
return wrapper
return run_lock
### 使用装饰器
@redis_lock(key='test', timeout=15)
def increase_data():
# lock = RedisLock(cache)
# lock_value = lock.get_lock('test')
time.sleep(10)
# lock.del_lock('test', lock_value)
发表评论
共 0 条评论
暂无评论