Spring Boot and RESTful API(14)Spring Boot with AKKA
Logging Performance
https://www.slf4j.org/faq.html#logging_performance
logger.debug("The new entry is {}. It replaces {}.", entry, oldEntry);
Add AKKA to my Spring Boot Project
pom.xml
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.4.19</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<version>2.4.19</version>
</dependency>
In the main class, system will initiate the AKKA system.
package com.sillycat.jobsmonitorapi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.sillycat.jobsmonitorapi.akka.base.SpringExtension;
import com.sillycat.jobsmonitorapi.service.JobDynamicCronService;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@SpringBootApplication
@EnableScheduling
public class JobsMonitorAPIApplication extends SpringBootServletInitializer {
private static final Logger logger = LoggerFactory.getLogger(JobsMonitorAPIApplication.class);
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(JobsMonitorAPIApplication.class);
logger.info("Start to init the AKKA system...");
SpringExtension ext = context.getBean(SpringExtension.class);
ActorSystem system = context.getBean(ActorSystem.class);
ActorRef supervisor = system.actorOf(ext.props("outOfBudgetSupervisor").withMailbox("akka.priority-mailbox"),
"outOfBudgetSupervisor");
logger.info("supervisor init with path {}", supervisor.path());
logger.info("AKKA system success inited...");
logger.info("Start the cron tasks to monitor ");
context.getBean(JobDynamicCronService.class).startCron();
logger.info("Cron tasks started! ");
}
}
In the configuration Directory, system will auto set up the ENV
package com.sillycat.jobsmonitorapi.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import com.sillycat.jobsmonitorapi.akka.base.SpringExtension;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorSystem;
@Configuration
@Lazy
@EnableAutoConfiguration
@ComponentScan(basePackages = { "com.sillycat.jobsmonitorapi.akka" })
public class ApplicationConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private SpringExtension springExtension;
@Bean
public ActorSystem actorSystem() {
ActorSystem system = ActorSystem.create("JobsMonitorAPIAKKA", akkaConfiguration());
springExtension.initialize(applicationContext);
return system;
}
@Bean
public Config akkaConfiguration() {
return ConfigFactory.load();
}
}
My system will be named as JobsMonitorAPIAKKA, some base class will help Actor use Spring and Spring Bean find Actor
package com.sillycat.jobsmonitorapi.akka.base;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import com.sillycat.jobsmonitorapi.akka.base.SpringActorProducer;
import akka.actor.Extension;
import akka.actor.Props;
@Component
public class SpringExtension implements Extension {
private ApplicationContext applicationContext;
public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public Props props(String actorBeanName) {
return Props.create(SpringActorProducer.class, applicationContext, actorBeanName);
}
}
package com.sillycat.jobsmonitorapi.akka.base;
import org.springframework.context.ApplicationContext;
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
public class SpringActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorBeanName;
public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
}
@Override
public Actor produce() {
return (Actor) applicationContext.getBean(actorBeanName);
}
@SuppressWarnings("unchecked")
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}
Customized PriorityMaiBox Policy help system to priority the messages
package com.sillycat.jobsmonitorapi.akka.base;
import com.sillycat.jobsmonitorapi.akka.msg.OutOfBudgetJobMsg;
import com.typesafe.config.Config;
import akka.actor.ActorSystem;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;
public class PriorityMailbox extends UnboundedPriorityMailbox {
public PriorityMailbox(ActorSystem.Settings settings, Config config) {
// Create a new PriorityGenerator, lower priority means more important
super(new PriorityGenerator() {
@Override
public int gen(Object message) {
if (message instanceof OutOfBudgetJobMsg) {
return ((OutOfBudgetJobMsg) message).getPriority();
} else {
// default
return 100;
}
}
});
}
}
Message POJO will be as normal, just add one priority column
package com.j2c.jobsmonitorapi.akka.msg;
public class OutOfBudgetJobMsg {
private String id;
private Integer sourceID;
private Integer campaignID;
private String jobReference;
private Boolean paused;
private Boolean dailyCapped;
private Boolean deduped;
private Boolean expired;
private Integer priority;
public OutOfBudgetJobMsg(String id) {
this.id = id;
this.priority = 10;
}
public OutOfBudgetJobMsg(String id, Integer sourceID, Integer campaignID, String jobReference, Boolean paused,
Boolean dailyCapped, Boolean deduped, Boolean expired) {
this.id = id;
this.sourceID = sourceID;
this.campaignID = campaignID;
this.jobReference = jobReference;
this.paused = paused;
this.dailyCapped = dailyCapped;
this.deduped = deduped;
this.expired = expired;
this.priority = 10;
}
…snip...
}
Here is my Actor
package com.sillycat.jobsmonitorapi.akka.actor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import com.sillycat.jobsmonitorapi.akka.msg.OutOfBudgetJobMsg;
import com.sillycat.jobsmonitorapi.domain.JobCloud;
import com.sillycat.jobsmonitorapi.repository.JobCloudPartialRepositorySolr;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
@Component
@Scope("prototype")
public class OutOfBudgetActor extends UntypedActor {
private final LoggingAdapter logger = Logging.getLogger(getContext().system(), "TaskProcessor");
@Autowired
JobCloudPartialRepositorySolr jobCloudPartialRepositorySolr;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof OutOfBudgetJobMsg) {
logger.debug("Message we receive is {}", message);
OutOfBudgetJobMsg msg = (OutOfBudgetJobMsg) message;
jobCloudPartialRepositorySolr.update(new JobCloud(msg.getId(), msg.getSourceID(), msg.getCampaignID(),
msg.getJobReference(), msg.getPaused(), msg.getDailyCapped(), msg.getDeduped(), msg.getExpired()),
false);
// String id, Integer sourceID, Integer campaignID, String
// jobReference, Boolean paused,
// Boolean dailyCapped, Boolean deduped, Boolean expired
logger.debug("Update the status back to SOLR ");
} else {
logger.error("outOfBudgetActor receive msg = {} msg type = {}", message, message.getClass());
unhandled(message);
}
}
}
Here is my SupervisorActor
package com.sillycat.jobsmonitorapi.akka.actor;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import com.sillycat.jobsmonitorapi.akka.base.SpringExtension;
import com.sillycat.jobsmonitorapi.akka.msg.OutOfBudgetJobMsg;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.ActorRefRoutee;
import akka.routing.Routee;
import akka.routing.Router;
import akka.routing.SmallestMailboxRoutingLogic;
@Component
@Scope("prototype")
public class OutOfBudgetSupervisor extends UntypedActor {
private final LoggingAdapter logger = Logging.getLogger(getContext().system(), "OutOfBudgetSupervisor");
@Autowired
private SpringExtension springExtension;
private Router router;
@Value("${outofBudgetActorCount}")
private Integer outofBudgetActorCount;
public void preStart() throws Exception {
logger.info("Init the operation on OutOfBudget...");
List<Routee> routees = new ArrayList<Routee>();
for (int i = 0; i < this.outofBudgetActorCount; i++) {
ActorRef actor = getContext().actorOf(springExtension.props("outOfBudgetActor"));
getContext().watch(actor);
routees.add(new ActorRefRoutee(actor));
}
router = new Router(new SmallestMailboxRoutingLogic(), routees);
super.preStart();
logger.info("Init the operation on OutOfBudget success!");
}
public void onReceive(Object msg) throws Exception {
if (msg instanceof OutOfBudgetJobMsg) {
router.route(msg, getSender());
} else if (msg instanceof Terminated) {
router = router.removeRoutee(((Terminated) msg).actor());
ActorRef actor = getContext().actorOf(springExtension.props("outOfBudgetActor"));
getContext().watch(actor);
router = router.addRoutee(new ActorRefRoutee(actor));
} else {
logger.error("Unable to handle msg {}", msg);
}
}
public void postStop() throws Exception {
logger.info("Shutting down OutOfBudgetSupervisor");
super.postStop();
}
}
The configuration file application.conf will be as follow:
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
# Log level used by the configured loggers (see "loggers") as soon
# as they have been started; before that, see "stdout-loglevel"
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = "DEBUG"
# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "DEBUG"
priority-mailbox {
mailbox-type = "com.sillycat.jobsmonitorapi.akka.base.PriorityMailbox"
}
}
References:
http://kimrudolph.de/blog/spring-boot-meets-akka
https://github.com/krudolph/akkaflow
https://www.slf4j.org/faq.html#logging_performance
Spring Boot and RESTful API(14)Spring Boot with AKKA
猜你喜欢
转载自sillycat.iteye.com/blog/2404510
今日推荐
周排行