技术标签: java
疯狂创客圈 经典图书 : 《Netty Zookeeper Redis 高并发实战》 面试必备 + 面试必备 + 面试必备 【博客园总入口 】
疯狂创客圈 经典图书 : 《SpringCloud、Nginx高并发核心编程》 大厂必备 + 大厂必备 + 大厂必备 【博客园总入口 】
入大厂+涨工资必备: 高并发【 亿级流量IM实战】 实战系列 【 SpringCloud Nginx秒杀】 实战系列 【博客园总入口 】
Akka是JAVA虚拟机平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言编写,同时提供了Scala和Java的开发接口。
Akka处理并发的方式,基于Actor模型实现。
Actor的概念来自于Erlang,在AKKA中,可以认为一个Actor就是一个容器,用以存储状态、行为、Mailbox以及子Actor与Supervisor策略。Actor之间并不直接通信,而是通过Mail来互通有无。
在Actor模式中,每个Actor都有一个(恰好一个)Mailbox。Mailbox相当于是一个小型的队列,一旦Sender发送消息,就是将该消息入队到Mailbox中。入队的顺序按照消息发送的时间顺序。Mailbox有多种实现,默认为FIFO。但也可以根据优先级考虑出队顺序,实现算法则不相同。
akka的核心,一个用于并发和分发的模型,没有线程原语的所有痛苦
一种直观而安全的方式来实现异步、非阻塞的回压流处理。
现代的、快速的、异步的、流的HTTP服务器和客户端。
通过在多个节点上分布您的系统来获得弹性和弹性。
根据用户的身份,在集群中分配您的参与者。
最终一致,高度读取和写入可用,低延迟数据
为参与者的事件包允许他们在重新启动后到达相同的状态。
在云系统上运行Akka系统的扩展(k8s,aws,…)
Akka流连接器用于集成其他技术
对并发模型进行了更高的抽象
是异步、非阻塞、高性能的事件驱动编程模型
是轻量级事件处理(1GB内存可容纳百万级别个Actor)
它提供了一种称为Actor的并发模型,其粒度比线程更小,你可以在系统中启用大量的Actor。
它提供了一套容错机制,允许在Actor出现异常时进行一些恢复或重置操作。
Akka既可以在单机上构建高并发程序,也可以在网络中构建分布式程序,并提供位置透明的Actor定位服务。
在并发程序中线程是并发程序的基本执行单元,但在Akka中执行单元是Actor。Actor模型是1973年提出的一个分布式并发编程模式,在Erlang语言中得到广泛支持和应用。
在Actor模型中并不是通过Actor对象的某个方法来告诉Actor需要做什么,而是给Actor发送一条消息。当一个Actor收到消息后,它有可能根据消息的内容做出某些行为,如更改自身状态,此时这个状态的更改是Actor自身进行的,并非由外界干预进行的。
在Erlang中,每段代码都运行在进程中,进程是Erlang中对Actor的称呼,意味着它的状态不会影响其他进程。系统中会有一个supervisor,实际上它只是另一个进程。被监控的进程挂掉了,supervisor会被通知并对此进行处理,因此也就能创建一个具有自愈功能的系统。如果一个Actor到达异常状态并且崩溃,无论如何,supervisor都可以做出反应并尝试把它变成一致状态,最常见的方式就是根据初始状态重启Actor。
简单来说,Actor通过消息传递的方式与外界通信,而且消息传递是异步的。每个Actor都有一个邮箱,邮箱接收并缓存其他Actor发过来的消息,通过邮箱队列mail queue来处理消息。Actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息外不能做任何其他操作。
每个Actor是完全独立的,可以同时执行他们的操作。每个Actor是一个计算实体,映射接收到的消息并执行以下动作:发送有限个消息给其他Actor、创建有限个新的Actor、为下一个接收的消息指定行为。这三个动作没有固定的顺序,可以并发地执行,Actor会根据接收到的消息进行不同的处理。
传统并发程序是基于面向对象的方法,通过对象的方法调用进行信息传递,如果对象的方法修改对象本身的状态,在多线程下就可能出现对象状态的不一致,此时就必须对方法调用进行同步,而同步操作会牺牲性能。
例如,两个线程同时尝试购买最后一件商品时,如果没有锁就可能出现多个线程同时断定计数器的值大于或等于购买数量,然后错误地递减计数器,从而导致出现负数,出现线程安全问题。
为了防止线程安全问题,就需要用到锁。
以使用Java轻量级锁为列,在高度竞争的阶段,很有可能出现很长的线程队列,他们都在等待递减计数器。但使用队列的方式的问题在于可能造成众多阻塞线程,也就是每个线程都在等待轮到它们去执行一个序列化的操作。
所以,不合理的使用锁,很可能将多核多线程的应用,变成单线程的应用,或者导致工作线程之间存在高度竞争。
Actor模型优雅的解决了这个难题,为真正多线程的应用提供了一个基础支持。
Actor模型把系统中所有事物都抽象成为一个Actor角色,在一个系统中可以将一个大规模的任务分解为一些小任务,这些小任务可以由多个Actor并发处理,从而减少任务的完成时间。
为响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
Actor角色的职责:
Actor的输入是接收到的消息
Actor接收到消息决定如何处理:如创建更多的actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息
Actor处理完成任务后可以发送消息给其它Actor
Actor模型的一个好处是可以消除共享状态:Actor每次只能处理一条消息,所以Actor内部可以安全的处理状态,而不用考虑锁机制。
在Actor模型中主角是actor,类似一种worker。Actor彼此之间直接发送消息,不需要经过什么中介,消息是异步发送和处理的。在Actor模型中一切都是Actor,所有逻辑或模块都可以看成是Actor,通过不同Actor之间的消息传递实现模块之间的通信和交互。
Actor是由状态(state)、行为(behavior)、邮箱(mailbox)三者组成的。
状态(state):状态是指actor对象的变量信息,状态由actor自身管理,避免并发环境下的锁和内存原子性等问题。
行为(behavior):行为指定的是actor中计算逻辑,通过actor接收到的消息来改变actor的状态。
邮箱(mailbox):邮箱是actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送发消息,而接收方则从邮箱中获取消息。
光有一个actor是不够的,多个actors才能组成系统。在Actor模型中每个actor都有自己的地址,所以他们才能相互发送消息。需要指明的一点是,尽管多个actors同时运行,但是一个actor只能顺序地处理消息。也就是说其它actor发送多条消息给一个actor时,这个actor只能一次处理一条。如果需要并行的处理多条消息时,需要将消息发送给多个actor。
消息是异步的传送到actor的,所以当actor正在处理消息时,新来的消息应该存储到别的地方,也就是mailbox消息存储的地方。
每个actor都有且仅有一个mailbox,mailbox相当于一个小型的队列,一旦sender发送消息,就将该消息入队到mailbox中。入队的顺序按照消息发送的时间顺序。
)
Actor模型描述了一组为避免并发编程的公理:
所有的Actor状态是本地的,外部是无法访问的。
Actor必须通过消息传递进行通信。
一个Actor可以响应消息、退出新Actor、改变内部状态、将消息发送到一个或多个Actor。
Actor可能会堵塞自己但Actor不应该堵塞自己运行的线程
第一行打印了HelloWorld Actor的路径,它是系统内第一个被创建的Actor。路径为:akka://hello/user/helloworld。第一个hello表示ActorSystem的系统名称,即构造时第一个入参。user表示用户Actor,所有的用户Actor都会挂载在user路径下。最后helloworld是这个Actor的名字。
第二行打印了Greeter Actor的路径,三、四行为Greeter中输出的信息。
第五行表示系统遇到了一条消息投递失败,原因是HelloWorld将自身停止了,导致Greeter发送的信息无法成功投递。
当使用Actor进行并发开发时,关注点已经不在线程上了,线程调度已经被Akka框架进行封装,只需关注Actor对象即可。Actor对象之间的通过显示的消息发送来传递信息。
当系统内有多个Actor时,Akka会自动在线程池中选择线程来执行我们的Actor,不同的Actor可能会被同一个线程执行或者一个Actor可能被不同的线程执行。
注意:不要在一个Actor中执行耗时的代码,这样可能会导致其他Actor的调度出现问题。
Akka用Scala语言编写,虽然提供了Java的开发接口,但是基于Akka开发的非常少。但是,很多分布式框架都是用akka做的,比如flink的分布式通信[3]就依赖Akka。
但是对于应用级团队,性能总是能满足需求即可,而不需要追求性能极限。中间件越能reliable,那么开发则越省心越快。应用开发团队当然更喜欢使用分布式中间件,比如Hadoop,spark,hive,flink,Kinesis,Kafka,Storm等组件来解决问题.
所以,Akka对于应用开发来说, 可以不用学习。但是对于架构师来说, 是一定需要学习的。
至少,Akka的原理非常值得学习。
以下程序演示了akka的一个简单的示例。创建Actor去处理一条命令,通过消息传递的方式进行交互。
引入依赖:
<dependency >
<groupId> com.typesafe.akka</groupId >
<artifactId>akka-actor_2.10</artifactId>
<version>2.3.10</version>
</dependency>
<dependency >
<groupId> com.typesafe.akka</groupId >
<artifactId>akka-persistence-experimental_2.10</artifactId>
<version>2.3.10</version>
</dependency>
//创建命令对象
@Data
@AllArgsConstructor
static class Command implements Serializable
{
private static final long serialVersionUID = 1L;
private String data;
}
//创建Actor对象
static class SimpleActor extends UntypedActor
{
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public SimpleActor()
{
log.info("SimpleActor constructor");
}
@Override
public void onReceive(Object msg) throws Exception
{
log.info("Received Command: " + msg);
if (msg instanceof Command)
{
final String data = ((Command) msg).getData();
// emmit an event somewhere...
} else if (msg.equals("echo"))
{
log.info("ECHO!");
}
}
}
public static void main(String[] args) throws InterruptedException
{
final ActorSystem actorSystem = ActorSystem.create("actor-system");
Thread.sleep(5000);
final ActorRef actorRef = actorSystem.actorOf(Props.create(SimpleActor.class), "simple-actor");
actorRef.tell(new Command("CMD 1"), null);
actorRef.tell(new Command("CMD 2"), null);
actorRef.tell(new Command("CMD 3"), null);
actorRef.tell(new Command("CMD 4"), null);
actorRef.tell(new Command("CMD 5"), null);
Thread.sleep(5000);
log.debug("Actor System Shutdown Starting...");
actorSystem.shutdown();
}
[INFO] [11/01/2020 18:12:15.303] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] SimpleActor constructor
[INFO] [11/01/2020 18:12:15.306] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 1)
[INFO] [11/01/2020 18:12:15.306] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 2)
[INFO] [11/01/2020 18:12:15.307] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 3)
[INFO] [11/01/2020 18:12:15.307] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 4)
[INFO] [11/01/2020 18:12:15.308] [actor-system-akka.actor.default-dispatcher-3] [akka://actor-system/user/simple-actor] Received Command: AkkaDemo.Command(data=CMD 5)
需求:使用多线程找出1000000以内素数个数
共享内存方式的处理流程如下:
传统方式通过锁/同步的方式实现并发,每次同步获取当前值并让一个线程去判断值是否为素数,若是的话则通过同步方式对计数器加一。
Actor模型方式的处理流程如下:
使用Actor模型方式会将此过程拆分成多个模块,即拆分成多个Actor。每个Actor负责不同部分,并通过消息传递让多个Actor协同工作。
存在的问题:当用户A Actor扣款期间,用户B Actor是不受限的,此时对用户B Actor进行操作是合法的,针对这种情况,单纯的Actor模型就显得比较乏力,需要加入其他机制来保证一致性。
说明:以上案例,仅仅作为模式的参考,并没有提供参考实现,具体的实现代码,大家可以来疯狂创客圈社群交流
Akka应用是由消息驱动的,消息是除Actor之外最重要的核心组件。在Actor之间传递的消息应该满足不变性,即不变模式,可变的消息无法高效的在并发环境中使用。在Akka中推荐使用不可变对象。在代码中可以使用final字段声明,在消息构造完成后,就不能再发生改变了。
Akka的消息投递策略:
至多一次投递:此策略中每一条消息最多会被投递一次,可能会偶尔出现投递失败的情况,从而导致消息丢失。此策略高性能。
至少一次投递:此策略中每一条消息至少会被投递一次,直至成功。在一些偶然的情况,接收者可能会收到重复消息,但消息不会丢失。此策略需要保存消息投递的状态并不断重试。
精确投递:所有消息保证被精确的投递并成功接收一次,既不会丢失也不会重复。此策略成本最高且不易实现。
关于消息的可靠性:没有必要在Akka层保证消息的可靠性,这样做成本太高且无必要。消息可靠性应该在应用的业务层进行保证,有时丢失一些消息是符合应用要求的。
消息投递的顺序性:Akka可以保证在一定程度上的投递顺序性。如Actor A1向A2顺序发送了M1、M2、M3三条消息,Actor A3向A2顺序发送了M4、M5、M6三条消息,则系统可以保证:
如果M1无丢失,它一定先于M2、M3被A2收到。
如果M2无丢失,它一定先于M3被A2收到。
如果M4无丢失,它一定先于M5、M6被A2收到。
如果M5无丢失,它一定先于M6被A2收到。
对于A2来说,来自A1向A3的消息并没有顺序性保证。
另外这种消息投递规则不具备可传递性,如下图:
C收到M1和M2的顺序是没有保证的
疯狂创客圈 - Java高并发研习社群,为大家开启大厂之门
这是我第一次在简书上写的第一篇文章,其实我关注简书有一两个月吧!平时我就在这里看看别人的文章,很有冲动自己也能像别人一样滔滔不绝的写作,可是我有自知自明根本不能达到那个高度。哈哈!而这一篇也是我今天在简书写的第一篇,因此我也喜欢我能写更多的不管是技术方面还是其他方面。下面这些内容如果有任何问题欢迎提出,由于工作时间不是很长如有问题还望大神指正,进而改之。废话不多说先看要实现的效果图:设计效果图所先..._android支付界面
最新需要做一个表格,有坐标轴,轴上有点,点间连线,想到了canvas,查找官方API,发现还是很简单的,不过毕竟是第一次接触html5,画线也费些时间。先说说canvas,初始化要设置宽高,而且不能用样式表,必须在canvas的属性中设置,然后调用API画线即可。代码如下:var c = document.getElementById("myCanvas");//初始化var ctx =_canvas连线
最近在使用DsoFramer第三方Office控件,遇到如题描述的问题,在网上查找到如下解决方案,发布出来和大家共享下,也给自己做个记录。 出现题目的异常,多是引用第三方控件引起的。在NEW时,需要初始化该对象。 AxESACTIVEXLib.AxESActiveX ax = new AxESACTIVEXLib.AxESActiveX();_system.windows.forms.axhost.invalidactivexstateexception:鈥淓xception_wasthro
sht20和 arduino UNO的接线方法:SCL - A5SDA - A4VCC-3.3VGND-GND用到的库函数库函数下载地址:上传中#include <Wire.h>#include "DFRobot_SHT20.h" DFRobot_SHT20 sht20; void setup(){ Serial.begin(9600); Serial.println("SHT20 Example!"); sht20.initSHT20._sht20 ardunio
示例代码@ComponentScan("org.springframework.examples")public class AnnotationConfigApplicationContextTest { public static void main(String[] args) { AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationC_spring beandefinition 添加注解
上一节中我们分析了linux kernel中igmp proxy相关的数据结构与实现需求分析,本节我们分析kernel中对组播数据流和组播数据的处理流程。对于目的ip地址为组播地址的数据,可以分为两类:1、 四层协议类型为igmp的igmp管理数据包2、 四层协议类型为udo的组播数据流而协议栈ip层数据处理完后,在函数ip_rcv_finish里则会进行组播路由的查找,并根据组播数据的类型会..._igmpproxy 如何添加路由缓存
下载依赖compilersudo apt install build-essential requiredsudo apt install cmake git libgtk2.0-dev pkg-config libavcodec-dev libavformat-dev libswscale-devoptionalsudo apt install libjpeg-dev libpn...
大家都知道WPF带来了很多新的特性,其中一个就是引入了一种新的属性机制——依赖属性。依赖属性出现的目的是用来实现WPF中的样式、自动绑定及实现动画等特性。依赖属性的出现是WPF这种特殊的呈现原理派生出来的,与.NET普通属性不同的是,依赖属性的值是依靠多个提供程序来判断的,并且其具有内建的传递变更通知的能力。依赖属性基本应用在了WPF的所有需要设置属性的元素。依赖属性根据多个提供对象来决定它的值..._依赖属性
内容主要摘自以下两个链接:https://www.cnblogs.com/LJWJL/p/3481995.htmlhttps://www.cnblogs.com/LJWJL/p/3481807.html现在FPGA编译器都支持verilog有符号运算的综合,并且综合后的有符号数都是以补码形式存在,明白点说,就是编译器可以自动把有符号数编码成补码形式。具体在有符号数处理过程中注意...
1260: [CQOI2007]涂色paintTime Limit: 30 SecMemory Limit: 64 MB Submit: 1575Solved: 955 [Submit][Status][Discuss]Description假设你有一条长度为5的木版,初始时没有涂过任何颜色。你希望把它的5个单位长度分别涂上红、绿、蓝、绿、红色,用一个长度为5的字符串表示这...
无感FOC电机控制代码,算法采用滑膜观测器,SVPWM控制,启动采用Vf,全开源代码,很有参考价值。带原理图,SMO推导,附有相关的文档资料, matlab模型,电机控制资料。_foc滑膜观测器
指令一、定义:指令只一种可以附加到DOM元素的微命令(tiny commands). 它们通常以"v-"作为前缀, 以方便Vue知道你在使用一种特殊的标记, 从而确保语法的一致性. 如果你需要对HTML元素的低级别(low-level)访问来控制一些行为, 它们通常很有用.五个钩子函数:bind: 只调用一次,指令第一次绑定到元素时调用,用这个钩子函数可以定义一个在绑定时执...