弹幕系统

长连接

1,当用户在观看视频时,如果客户端针对某一视频创建了弹幕,发送后端进行处理,后端需要对所有正在观看该视频的用户推送该弹幕。如果使用短连接进行通信进行通信,HTTP协议是一个请求-响应协议,请求必须先由浏览器发给服务器,服务器才能响应这个请求,再把数据发送给浏览器。所有观看视频的客户端不断轮询后端,若有新的弹幕则拉取后进行显示,即使无新弹幕也不断发起连接以了解服务器有没有新的信息,故必须不停连接后端轮询的效率低或者 HTTP 连接始终打开(长连接存在一个保持时间限制,一个HTTP连接在长时间没有数据传输的情况下,链路上的任何一个网关都可能关闭这个连接,而网关是我们不可控的,必须定期发一些ping数据表示连接“正常工作”,虽然可以在一次 TCP 连接中完成多个 HTTP 请求,但是对每个请求仍然要单独发 header,要大量交换 HTTP header),非常浪费资源。

采用WebSocket进行前后端通信,建立持久的双边通信通道。WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(Full-Duplex)通信,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。WebSocket允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。WebSocket与HTTP协议兼容,默认端口也是80和443,并且握手阶段采用 HTTP 协议,使用HTTP Upgrade头从HTTP协议更改为WebSocket协议,响应代码101表示本次连接的HTTP协议即将被更改,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。

bg2017051502

2,整体架构

image-20220421000624678

客户端与服务器间建立websocket通信。在向用户发送新建弹幕时,服务压力较大,比如1000人同时观看同一个视频,并且同时发送一条弹幕,对于服务器端需要向1000人发送新建弹幕,每人发送1000条新建弹幕,共计1000000次请求。对此使用消息队列进行削峰,每次只处理部分发送请求,其余请求放在MQ中等待被处理,对于用户来说,弹幕存在3妙内的延迟是可以接受的,根据服务器最大吞吐量时的并发处理数量,决定每次从MQ中拉取的任务数。

数据保存到数据库时,由于弹幕的保存对用户无感,且操纵耗时,可以使用异步操作;使用redis保存新添加弹幕,下次读取弹幕直接从redis中读取,避免访问数据库,降低数据库压力,向redis写入当弹幕数据时,由于redis读写快速,可以使用同步操作,同时也是为了保证下一次查询数据时,即使数据还没落盘,也能在redis中找到。

数据库表

t_danmu保存单挑弹幕信息,包括:发送者ID、弹幕所属视频ID、弹幕内容、弹幕在视频哪个时间发送。由于经常需要查询userId,videoId字段,所以添加索引以加快查询速度,同时他们还是其余表的主键,需要添加外键约束。

消息队列

当不需要立即获得结果,但是并发量又需要进行控制的时候。

1,优点:

1506330094150_2541_1506330096475

2,工作模式

1506330158945_9538_1506330161280

3,RabbitMQ

4,问题

阻塞非阻塞与同步异步
IO多路复用

通过一种机制,可以监视多个文件描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作,没有就绪事件时,就会阻塞交出cpu。多路是指多个链接,复用指的是复用同一线程。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。

NIO

是一种同步非阻塞的 I/O 模型,在等待就绪阶段都是非阻塞的,真正的 I/O 操作是同步阻塞。是 I/O 多路复用的基础,成为解决高并发与大量连接、I/O 处理问题的有效方式。

服务器端同步阻塞 I/O 处理:socket.accept()、socket.read()、socket.write() 三个主要函数都是同步阻塞的,当一个连接在处理 I/O 的时候,系统是阻塞的,所以使用多线程时,就可以让 CPU 去处理更多的事情。低并发下结合线程池使得创建和回收成本相对较低,并且编程模型简单。创建和销毁都是重量级的系统函数,线程本身占用较大内存,线程的切换成本是很高的,无法应对百万级连接。

所有的系统 I/O 都分为两个阶段:等待就绪和操作。举例来说,读函数,分为等待系统可读和真正的读;同理,写函数分为等待网卡可以写和真正的写。NIO 里用户最关心” 我可以读了”。NIO的读写函数可以立刻返回而不是柱塞,如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,将用于传输的通道全部注册到选择器上,选择器监控通道,当某一通道就绪后连接继续进行读写,没有必要开启多线程。没有线程切换,只是拼命的读、写、选择事件。

77752ed5

Java NIO 实际读写时的核心在于:通道(Channel)和缓冲区(Buffer),选择器。通道表示打开到 IO 设备(文件流、套接字)的连接,对原 I/O 包中的流的模拟,负责传输;缓冲区用于容纳数据,负责存储,Channel的读写必须通过buffer对象,然后操作缓冲区,对数据进行处理。缓存区是双向的,既可以往缓冲区写入数据,也可以从缓冲区读取数据:缓冲区<->然后缓冲区通过通道进行传输<->从缓冲区取数据。选择器:把Channel通道注册到Selector中,通过Selecotr监听Channel中的事件状态,这样就不需要阻塞等待客户端的连接,从主动等待客户端的连接,变成了通过事件驱动,通过事件驱动实现单线程管理多个Channel的目的。

0ece5d16ec1345a5b4dc2149cb5a8b40_tplv-k3u1fbpfcp-zoom-in-crop-mark_1304_0_0_0

缓冲区根据数据类型的不同,可以进行划分ByteBuffer、CharBuffer等。根据工作方式分:直接缓冲区(磁盘->内核地址空间中->用户地址空间中->读取到应用程序)与非直接缓冲区(将缓冲区建立在物理内存之中,读写数据直接通过物理内存进行)。

连接建立

1,信息保存,使用AtomicInteger记录在线人数,每新建一个连接就加一,断开连接就减一;由于每新建一个连接都会新建一个ServerEndpoint,所以使用ConcurrentHashMap用于保持<sessionId,WebSocketService>,保存用户连接信息,每新建一个连接就添加新的连接信息,断开连接就删除连接信息。使用Session于连接绑定,保存于某个用户会话信息。

同时由于WebSocketService非单列,spring只会对第一个WebSocketService注入所需依赖,后续WebSocketService无法获得需要的依赖,需要手动从ApplicationContext获取需要的依赖,而ApplicationContext的注入在当前项目的main方法完成。

Bean作用域
Map

1,填装因子

loadFactor是HashMap负载程度的一个度量,即HashMap持有的元素数量和HashMap大小的比值,loadFactor是为了让HashMap尽可能不满而存在的,理论上一次成功/不成功的查找耗时均为O(1+a),a为装载因子,加载因子越大,填满的元素越多,空间利用率越高,但冲突的机会加大了,查找时间复杂度上升。反之加载因子越小,填满的元素越少,冲突的机会减小,查找时间复杂度下降,但空间浪费多了。当HashMap中的元素数量大于capacity*loadFactor时,HashMap就要扩容,并进行重新计算元素存储位置。

HashMap碰撞与否,其实是与hashCode()的设计有很大关系,如果哈希函数得当,就可以使哈希地址尽可能的均匀分布在哈希地址空间上,从而减少冲突的产生,但一个良好的哈希函数的得来很大程度上取决于大量的实践,此外各种情形下设置也不尽相同,比如ThreadLocalMap的装载因子为2/3,,因为ThreadLocalMap使用线性探查解决冲突,转载因子太大将导致错位区块的产生。转载因子变大查询复杂度升高,但是占用的空间降低了,对于内存消耗频繁/GC频繁的应用来说,如果能接受hashmap的查询耗时损耗,将转载因子变大可能是非常值得的。

2,链表转红黑树阈值

虽然红黑树有更好的查找效率O(log(N)),但是TreeNode的大小约为链表节点的两倍,在红黑树进行插入、删除等操作时为了平衡红黑树还要进行额外的操作,维护成本明显高于链表。所以只有在一个拉链已经拉了足够节点(默认为8)并且HashMap容量大于等于64的时候才会转为tree,否则进行扩容。当这个hash桶的节点因为移除后resize数量变小(默认为6)的时候,会将树再转为拉链。

哈希DoS攻击:RESTful兴起,数据传输使用Json,在收到Json字符串后进行jsonDecode操作,攻击者借由发送一条充满数千个变量的POST报文,所有变量的hashcode相同,在将数据存入HashMap时,某个哈希桶中有大量数据,导致哈希函数就会超载,仅是处理此单一请求也需要大量时间,n个数据插入复杂度O(N^2)),使用红黑树实现更快的查找,实现兜底。

3,哈希计算

hash=(h=key.hashCode())^(h>>>16),key的hash值高16位不变,低16位与高16位异或作为key的最终hash值,最后元素下标为:index=(n-1)&hash,(本质为除n取余,当且仅当n=2^k时成立),其中n=table.length。

2884823463jdfdjfgjdf

因为table的长度都是2的幂,因此如果对hashCode直接取余,index仅与hashCode的低n位有关,hashCode的高位都被与操作置为0了,这样做很容易产生碰撞。通过将高16位与低16位异或来获得hsah值,使下标值同时用到高位与低位的信息,hashCode只要有一位发生改变,整个hash返回值就会改变,保证了hash值的随机性。

3,扩容

容量变为以前的两倍。对于每个哈希桶,如果桶里面只有一个元素,直接重新计算下标newIndex=(e.hash&(newCap-1))放入新的哈希表;挨个计算`(e.hash&oldCap)==0(其中oldCap=2^k)是否成立,如果成立表明hash的第k+1个二进制为0,此时newIndex=(newCap-1)&hash的最高位也就是第k+1个二进制位为0,也是时newIndex=oldIndex,newIndex仍旧在原先的旧位置,如果hash的第k+1个二进制为1,则newIndex相较于oldIndex多出来第k+1位,也就是newIndex=2^k+oldIndex=oldCap+oldIndex。通过遍历桶中元素,将元素分为低位(oldIndex)、高位(oldCap+oldIndex)两个链表。如果桶里面原先装的是红黑树还要判断各自是否满足树化条件,如果满足还要转换为红黑树。扩容的时间复杂度为O(N),一次扩容后还能再插入N个数据,所以扩容的成本可以均摊到后续N个元素中,每个元素的扩容成本为O(1),最终元素的插入时间复杂度仍为O(1)。

4,添加

HashMap使用延迟初始化,在第一次添加数据时才初始化数据结构,如果table没有使用过的情况则以默认大小进行一次resize。计算key的hashCode、hash、索引值,然后获取底层table数组的数据,如果获取的数据为null,表明不存在hash冲突,则直接插入;如果不为null先检查该bucket第一个元素是否是和插入的key相等,相等则直接替换;如果和第一个元素的key不相等并且是TreeNode的情况,调用TreeNode的put方法将数据插入红黑树,并进行相应的左旋、右旋等平衡操作;否则桶内就存储的是单链表,循环遍历链表,如果找到相等key的节点则跳出循环,替换节点的值,并将旧值返回;如果链表中没找到key相等的节点达到最后一个节点时将新的节点添加到链表最后,此时新增了key-value对,如果桶内元素个数达到树化的阈值就将单链表转换为红黑树。最后判断总的元素个数判断是否超过了threshold,如果超过则需要进行resize扩容,然后返回null。

5,初始化参数

初始化容量:initialCapacity的默认值是16,即使内存足够,也不能将initialCapacity设得过大,虽然大初始化容量可避免扩容导致的效率的下降,get和put方法都是常数复杂度的,也不是因此而增加时间复杂度。但是实际的程序可能不仅仅使用get和put方法,也有可能使用迭代器,如使用EntrySet迭代时,底层实现时挨个遍历哈希桶,再在桶里挨个遍历节点,如果initialCapacity容量较大,导致大量空哈希桶,那么会使迭代器效率降低。所以理想的情况还是在使用HashMap前估计一下数据量,太小反复扩容导致得数组复制、重新计算下标、重新构建红黑树的开销,太大空间利用率低,迭代器遍历成本上升。

6,key选择

key一般选择Immutable对象(Immutable:创建之后就不能发生改变,任何对它的改变都应该产生一个新的对象;对象应该是final的,以此来限制子类继承父类,以避免子类改变了父类的immutable特性;如果类中包含mutable类对象,那么返回给客户端的时候,返回该对象的一个拷贝,而不是该对象本身,防止用户修改。这也不绝对,只要参与计算hashCode、equals、compare得字段不变即可),并且覆写hashCode()以及equals()方法,如果在HashMap中使用可变对象作为Key带来的问题:如果HashMapKey发生变化,导致hashCode()/equal()的结果改变,那么该key在HashMap中的存取时可能再也查找不到这个Entry了。常见的Key比如String、Integer这些类已经很规范的覆写了hashCode()以及equals()方法,并且作为不可变类天生是线程安全的,可以不用被synchronize就在并发环境中共享,可以很好的优化比如因为不可变所以可以缓存hash值,避免重复计算等。

HashMap , Hashtable , ConcurrentHashMap

1,线程安全

HashtableConcurrentHashMap是线程安全,HashMap是非线程安全。多线程环境下HashTable的对数据进行修改的方法都使用了synchronized描述符,来满足线程安全的特性,使用了对象级别的同步锁,读和写操作都需要获取锁,本质上仍然只允许一个线程访问,其他线程被排斥在外,当每次对Map做修改操作的时候都会锁住这个Map对象。HashMap在多线程环境下也可以使用Collections.synchronizedMap()方法来获取一个线程安全的集合。Collections.synchronizedMap()实现原理是Collections定义了一个SynchronizedMap的内部类,这个类实现了Map接口,在调用方法时使用synchronized来保证线程同步,虽然synchronized不再放在方法上,而是放在方法内部,使用this作为互斥量作为同步块出现,但仍然是对象级别的同步锁,和HashTable没有太大区别。HashTable已经被淘汰了,如果不需要线程安全,那么使用HashMap,如果需要线程安全,那么使用ConcurrentHashMap

HashMap并发不安全:两个put的key发生了碰撞(hash值一样),那么根据HashMap的实现,这两个key会添加到数组的同一个位置,这样最终就会发生其中一个线程的put的数据被覆盖。此外如果多个线程同时检测到元素个数超过数组大小*loadFactor,会发生多个线程同时对hash数组进行扩容,可能会引起链表成环而导致死循环的错误。ConcurrentHashMap是并发安全的,插入数据时通过原子操作判断哈希桶下有没有其它线程对数据进行了修改,保证了同时只有一个线程修改链表,防止出现链表成环,然后开始写入数据;读取时按照链表或者红黑树正常读取即可(Node字段value、next都用了volatile修饰,保证了可见性)。

ConcurrentHashMap底层使用Node(value 、next都用了volatile修饰,保证了可见性)数组+链表+红黑树的数据结构来实现,并发控制使用synchronizedCAS来操作:写入数据时调用Unsafe的本地方法获取指定内存中的数据,保证每次拿到的数据都是最新的,定位出的节点如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则不断尝试保证成功;如果获得的元素非空利用synchronized锁住桶中的第一个节点,这里是对数组元素加锁(Node),无论是相较于Collections.synchronizedXXX返回的包装类还是HashTable,加锁粒度更细,多个桶可以并发读写。

并发问题的三个来源:原子性、可见性、有序性。ConcurrentHashMap只能保证提供的原子性读写操作是线程安全的,也就是put()get()操作是线程安全的。可见性问题: CPU 在计算时优先从离自己最近、速度最快的 CPU 缓存中获取数据去计算,其次再从内存中获取数据,导致数据不一致。原子性问题:比如注册用户,使用先判断是否存在,再写入数据,如果两个线程同时发现用户不存在,之后都进行写数据,将导致出现重复添加问题,可以两个操作放在一起执行完,这与数据库事务的原子性理解差不多。有序性:编译器为了提高性能有时候会改变代码执行的顺序,对于单线程代码指令重排序对于执行没有什么影响,但是会对多线程并发代码执行产生不可预知的结果。提供的putIfAbsent接口,其含义是如果 key 已经存在则返回存储的对象,否则返回null,这个方法实现加了synchronized锁,为线程安全。在这个场景中如果不使用putIfAbsent就要对register(User user)方法加锁,对于性能的影响更大。

2,底层实现

HashTable底层实现是数组+单链表;HashMap是数组+单链表/红黑树;ConcurrentHashMap是数组+单链表/红黑树。HashMap的初始容量为16,Hashtable初始容量为11,两者的填充因子默认都是0.75。HashMap扩容时是capacity*2Hashtable扩容时是capacity*2+1。HashTable会尽量使用素数、奇数。而HashMap则总是使用2的幂作为哈希表的大小。当哈希表的大小为素数时,简单的取模哈希的结果会更加均匀,而HashMap使用对hashCode二次运算增强hash值得随机性,来弥补容量不是素数的缺点,同时将哈希表的大小固定为了2的幂可以用位运算来替代取余速度更快。

3,null处理

HashMap支持null键和null值,而HashTableConcurrentHashMap在遇到key或者value为null时,会抛出NullPointerException异常。这仅仅是因为HashMap在实现时对null做了特殊处理,将null的hashCode值定为了0,从而将其存放在哈希表的第0个bucket中,因此在HashMap中不能由get()方法来判断HashMap中是否存在某个key,应该用containsKey()方法(containsKey,根据key获得节点,通过判断节点是否为null来判断key是否存在,即使key和value都为null的节点,节点本身也不为null)来判断。而concurrenthashmap它们是用于多线程的,并发的,如果map.get(key)得到了null,不能判断到底是映射的value是null,还是因为没有找到对应的key而为空,相较于单线程状态的hashmap却可以用containKey(key)去判断到底是否包含了这个null;支持并发的ConcurrentHashMap在调用containskey和m.get(key)时ConcurrentHashMap可能已经不同了。

4,fail-fast

HashMap的迭代器是fail-fast(旨在停止正常运行,而不是尝试继续可能存在缺陷的过程)迭代器,而ConcurrentHashMapHashTable的迭代器不是fail-fast的,因为要支持并发。所以当在使用迭代器遍历HashMap时数据结构上被修改(增加或者移除元素不包括更新节点值),将会抛出ConcurrentModificationException,但迭代器本身的remove()方法移除元素则不会抛ConcurrentModificationException异常。

应用上下文

运行环境,保证应用的正常运行,具体就是管理应用所依赖的bean,并在需要该bean的地方注入依赖,容器是Spring框架实现功能的核心,容器不只是帮我们创建了对象那么简单,它负责了对象整个的生命周期的管理——创建、装配、销毁。应用上下文即是Spring容器的一种抽象化表述,是一维护Bean定义以及对象之间协作关第的高级接口。将需要管理的对象(Spring中我们都称之问bean)、bean之间的协作关系配置好,然后利用应用上下文对象加载进我们的Spring容器,容器就能为你的程序提供你想要的对象管理服务。

DefaultListableBeanFactory:这就是大家常说的 ioc 容器,它里面有很多 map、list。spring 帮我们创建的 singleton 类型的 bean 就存放在其中一个 map 中。扩展点集合:存放 spring 扩展点(BeanPostProcessor)接口的 list 集合。 启动其实就是调用refresh 完成 spring context 的初始化和启动过程:创建 BeanFactory、注册 BeanPostProcessor 等。

原子类

确保线程安全最常见的做法是利用锁机制来对共享数据做互斥同步,互斥同步最主要的问题是线程阻塞和唤醒所带来的性能问题。volatile 是轻量级的锁,它保证了共享变量在多线程中的可见性,但无法保证复合操作的原子性。

CAS:根据地址v取值A=get(v)->B=f(A)->A==get(v)->成立则将B写入v,失败则不断重复至成功,实现非阻塞同步(乐观锁);缺点:重复读取get(v),单变量原子性(封装,AtomicReference),ABA问题(AtomicStampedReference,版本号比较)。

使用private volatile int value;保证值得可见性以及禁止重排序,将volatile的概念延伸到那些提供原子条件更新操作的字段和数组元素。

具体实现以getAndIncrement为例:

volatile

可见性:每次读取volatile时,都会看到任意一个线程对该volatile的最后一次写入。确保在写入后将其从高速缓存(cache)中刷新到主存(memory),以便它们可以立即对其他线程可见。 同样,在读取volatile字段之前,必须使高速缓存无效,以便可以看到主内存中的值而不是本地处理器高速缓存中的值。

重排序:由于对volatile 字段重新排序添加了严格约束,因此当线程A写入volatile 字段f时,对线程A可见的任何内容,这些内容在线程B读取f时都可见。读取在写入之后,保证正确的happens-before 关系。

内存屏障,使中央处理单元(CPU)或编译器对于在屏障指令之前和之后发出的存储器操作执行一种排序约束,可以保证在屏障之前发布的操作可以在屏障之后发布的操作之前执行。

原子性:保证对变量的单次读写是原子的(即使是64位数据类型),而x++为复合操作,不具备原子性。

状态保存

HTTP 无状态协议。

接收消息

1,前端传送的是JSON字符串格式的弹幕信息对象,在收到消息后,查询WEBSOCKET_MAP获得当前视频的在线用户连接信息。构建键值对<sessionId,danmu>表示要向sessionId的连接会话发送指定的弹幕信息。然后将发送任务交给信息队列后立即返回,进行下一个用户弹幕的发送,而不必等待当前弹幕真正发送完成才返回,实现异步处理以及削峰,而在消息的接收端,根据服务器最大吞吐量时的并发处理数量,决定每次从MQ中拉取的任务数,完成弹幕信息的真正发送。注意由由于WebSocketService非单列,需要手动从ApplicationContext获取需要的依赖。

在向用户推送完成弹幕信息后,将弹幕保存到数据库和redis。由于弹幕保存到数据库对用户无感,且操纵耗时,可以使用异步操作,减轻数据库压力,同时让调用立即返回不必等待最终结果,调高了并发处理能力;之后使用redis保存新添加弹幕,使用的key是视频的id,value是该视频对应的弹幕,下次读取弹幕直接从redis中读取,避免访问数据库,降低数据库压力,由于redis读写快速,可以使用同步操作,同时也是为了保证下一次查询数据时,即使数据还没落盘,也能在redis中找到。存储的key为dm-video-videoId,value为当前视频对应的弹幕列表对象格式化后的字符串。

将弹幕推送消息发送到交换机user-danmu,经过exchange到达消息队列q-user-danmu

消息接收者监听消息队列q-user-danmu,从消息队列中获取消息并发送给指定用户。

消息队列运行状态:

image-20220422163639433

用户0收到其它用户和自己(0,1,2)发送的实时弹幕:

异步保存数据到数据库:

@Async

将任务交于线程池,由指定的线程池中的线程执行,调用方立即返回。

1,需要配置执行的线程池和异常处理。执行的线程池默认情况下使用org.springframework.core.task.TaskExecutor,或者一个 Bean 的 Name 为 taskExecutor 的 java.util.concurrent.Executor 作为执行任务的线程池。如果都没有的话,会创建 SimpleAsyncTaskExecutor线程池来处理异步方法调用,当然 @Async 注解支持一个 String 参数,来指定一个类型是 Executor 或 TaskExecutor的Bean,表示使用这个指定的线程池来执行这个异步任务。

@Async 标记的方法只能是 void 或者 Future 返回值,在无返回值的异步调用中,异步处理抛出异常,并会捕获指定异常,原有任务还会继续运行,直到结束。而在有返回值的异步调用中,异步处理抛出了异常,会直接返回主线程处理,异步任务结束执行,主线程也会被异步方法中的异常中断结束执行。

本质基于动态代理实现:Spring容器启动初始化bean时,判断类中是否使用了@Async注解,创建切入点和切入点处理器,根据切入点创建代理,在调用@Async注解标注的方法时,会调用代理,执行切入点处理器invoke方法,根据要执行的任务创建Callable接口对象,将方法的执行提交给线程池,实现异步执行。所以如果A类的a方法(没有标注@Async)调用它自己的b方法(标注@Async)是不会异步执行的,因为从a方法进入调用的都是它本身,不会进入代理。

创建AOP代理的切面:

异步方法的执行:

注意事项:默认线程池大小、核心线程大小为INT的最大值,当方法频繁调用的时候,异步任务的数量就会大量增长,如果任务处理不够快,就很可能会出现内存溢出的情况。最好手动配置线程池参数。

2,在消息队列的接收端,根据传递过来的<sessionId,danmu>,向sessionId的连接会话发送指定的弹幕信息,完成弹幕信息的真正发送。将任务的调用方与真正执行任务方解耦,往消息队列中写入消息的一方与从队列中读取消息的一方相关性较低,二者可能由不同技术栈实现,也可能不在同一台物理机器上。

Redis

性能优秀,数据在内存中,读写速度非常快。单进程单线程,是线程安全的,采用 IO 多路复用机制。丰富的数据类型,支持字符串 (strings)、散列(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets) 等。支持数据持久化,可以将内存中数据保存在磁盘中,重启时加载。主从复制,哨兵,高可用。

1,数据类型

2,有效期

Redis 中有个设置缓存时间过期的功能,即对存储在 redis 数据库中的值可以设置一个过期时间。

3,内存淘汰机制

如果定期删除漏掉了很多过期 key,然后也没及时去查,也就没走惰性删除,如果大量过期 key 堆积在内存里,导致 redis 内存块耗尽。

4,持久化机制

5,缓存雪崩

缓存同一时间大面积的失效,所以后面的请求都会落到数据库上,造成数据库短时间内承受大量请求而崩掉。

解决办法:1,尽量保证整个 redis 集群的高可用性,发现机器宕机尽快补上;2,把每个 Key 的失效时间都加个随机值,保证数据不会再同一时间大面积失效。3,选择合适的内存淘汰策略,防止爆内存。对请求限流 ,避免 MySQL在redis崩溃后也崩掉,只要数据库正常工作,就可以处理用户请求,保证系统仍然可用。4,利用 redis 持久化机制保存的数据尽快恢复缓存。

14534869-cefa2f5519af3a09

6,缓存穿透

大量请求缓存中不存在的数据,导致所有的请求都落到数据库上,造成数据库短时间内承受大量请求而崩掉。

解决办法:1,在接口层增加校验,比如用户鉴权,参数做校验;2,采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的 bitmap 中,用于快速判断出 Key 是否在数据库中存在,一个一定不存在的数据会被这个 bitmap 拦截掉,这个恶意请求就会被提前拦截,从而避免了对底层存储系统的查询压力。3,如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟。4,在访问数据据时加上互斥锁。

7,缓存击穿

缓存击穿是指一个 Key 非常热点,在不停地扛着大量的请求,大并发集中对这一个点进行访问,当这个 Key 在失效的瞬间,持续的大并发直接落到了数据库上,就在这个 Key 的点上击穿了缓存。

解决办法:1,设置热点数据永不过期。2,在访问数据据时加上互斥锁。

8,底层实现

Redis 内部使用一个 redisObject 对象来表示所有的 key 和 value。type 表示一个 value 对象具体是何种数据类型,encoding 是不同数据类型在 Redis 内部的存储方式。

9,快

Redis 单进程单线程的模型,因为 Redis 完全是基于内存的操作,CPU 不是 Redis 的瓶颈,Redis 的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且 CPU 不会成为瓶颈,那就顺理成章的采用单线程的方案了。

10,主从复制

11,redis和数据库双写一致性问题

数据库和缓存双写,就必然会存在不一致的问题。如果对数据有强一致性要求,不能放缓存。如果一致性不是太高可以采取正确更新策略,先更新数据库,再删缓存。

12,工作模式

查询弹幕

查询策略是优先查redis中的弹幕数据,如果没有的话查询数据库,然后把查询的数据写入redis当中,以便下次查询。

客户端获取到的id为33的视频的弹幕数据

 

人数更新

由于在线观看的人数是不断变化的,需要不断更新客户端显示的在线人数。同样,如果使用短连接轮询的方式将对服务器造成较大的压力,这里利用之前处理弹幕信息的websocket连接,每隔一定时间,就向当前视频的在线用户发送更新后的在线人数。

用户0每隔5s都能收到服务端推送的视频在线人数