diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/.gitignore b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/.gitignore new file mode 100644 index 0000000..349cac9 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/.gitignore @@ -0,0 +1,18 @@ +*.class +*.log + +# sbt specific +.cache/ +.history/ +.lib/ +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ + +# Scala-IDE specific +.scala_dependencies +.worksheet +.idea \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/LICENSE b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/LICENSE new file mode 100644 index 0000000..e06d208 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/README.md b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/README.md new file mode 100644 index 0000000..b8225f2 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/README.md @@ -0,0 +1,6 @@ +Implementing the Reactive Manifesto with Akka +--- + +Source obtained from [github.com/adamw/reactive-akka-pres](https://github.com/adamw/reactive-akka-pres/). + +Dataset: http://stat-computing.org/dataexpo/2009/the-data.html diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/build.properties b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/build.properties new file mode 100644 index 0000000..748703f --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.7 diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/build.sbt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/build.sbt new file mode 100644 index 0000000..e2df08f --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/build.sbt @@ -0,0 +1,24 @@ +organization := "com.softwaremill" + +name := "reactive-akka-pres" + +version := "0.1-SNAPSHOT" + +scalaVersion := "2.11.6" + +val akkaVersion = "2.3.9" + +libraryDependencies ++= Seq( + // akka + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-persistence-experimental" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster" % akkaVersion, + "com.typesafe.akka" %% "akka-contrib" % akkaVersion, + "com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M5", + // util + "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2", + "ch.qos.logback" % "logback-classic" % "1.1.3", + "org.scalatest" %% "scalatest" % "2.2.4" % "test", + "joda-time" % "joda-time" % "2.7", + "org.joda" % "joda-convert" % "1.7" +) diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/cluster-client-template.conf b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/cluster-client-template.conf new file mode 100644 index 0000000..210b59b --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/cluster-client-template.conf @@ -0,0 +1,12 @@ +akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + + remote.netty.tcp.port = 0 + remote.netty.tcp.hostname = "127.0.0.1" +} + +cluster.client.initial-contact-points = [ + "akka.tcp://receiver@127.0.0.1:9171", + "akka.tcp://receiver@127.0.0.1:9172", + "akka.tcp://receiver@127.0.0.1:9173" +] \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/cluster-receiver-template.conf b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/cluster-receiver-template.conf new file mode 100644 index 0000000..95fe2f7 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/cluster-receiver-template.conf @@ -0,0 +1,19 @@ +akka { + actor.provider = "akka.cluster.ClusterActorRefProvider" + + remote.netty.tcp.port = 0 // should be overriden + remote.netty.tcp.hostname = "127.0.0.1" + + cluster { + seed-nodes = [ + "akka.tcp://receiver@127.0.0.1:9171", + "akka.tcp://receiver@127.0.0.1:9172", + "akka.tcp://receiver@127.0.0.1:9173" + ] + auto-down-unreachable-after = 10s + roles = [ "receiver" ] + role.receiver.min-nr-of-members = 2 + } + + extensions = [ "akka.contrib.pattern.ClusterReceptionistExtension" ] +} \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide1.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide1.txt new file mode 100644 index 0000000..fb0a2cc --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide1.txt @@ -0,0 +1,24 @@ + ██████╗ ███████╗ █████╗ ██████╗████████╗██╗██╗ ██╗███████╗ + ██╔══██╗██╔════╝██╔══██╗██╔════╝╚══██╔══╝██║██║ ██║██╔════╝ + ██████╔╝█████╗ ███████║██║ ██║ ██║██║ ██║█████╗ + ██╔══██╗██╔══╝ ██╔══██║██║ ██║ ██║╚██╗ ██╔╝██╔══╝ + ██║ ██║███████╗██║ ██║╚██████╗ ██║ ██║ ╚████╔╝ ███████╗ + ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═══╝ ╚══════╝ + +██╗███╗ ███╗██████╗ ██╗ ██╗ ██╗ ██╗ █████╗ ██╗ ██╗██╗ ██╗ █████╗ +██║████╗ ████║██╔══██╗██║ ██║ ██║ ██╔╝██╔══██╗██║ ██╔╝██║ ██╔╝██╔══██╗ +██║██╔████╔██║██████╔╝██║ ██║ █╗ ██║ ██╔╝ ███████║█████╔╝ █████╔╝ ███████║ +██║██║╚██╔╝██║██╔═══╝ ██║ ██║███╗██║ ██╔╝ ██╔══██║██╔═██╗ ██╔═██╗ ██╔══██║ +██║██║ ╚═╝ ██║██║ ███████╗ ╚███╔███╔╝██╔╝ ██║ ██║██║ ██╗██║ ██╗██║ ██║ +╚═╝╚═╝ ╚═╝╚═╝ ╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝ + _____ _ _ _ _ _ _ + | _ |_| |___ _____ | | | |___ ___ ___| |_|_| + | | . | .'| | | | | | .'| _|_ -| '_| | + |__|__|___|__,|_|_|_| |_____|__,|_| |___|_,_|_| + + _____ _ _ _ _ + |_ _|__ ___| |__ _ __ ___ | | ___ __ _ _ _| | | | ___ _ _ _ __ + | |/ _ \/ __| '_ \| '_ \ / _ \| |/ _ \ / _` | | | | |_| |/ _ \| | | | '__| + | | __/ (__| | | | | | | (_) | | (_) | (_| | |_| | _ | (_) | |_| | | + |_|\___|\___|_| |_|_| |_|\___/|_|\___/ \__, |\__, |_| |_|\___/ \__,_|_| + |___/ |___/ \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide2.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide2.txt new file mode 100644 index 0000000..6f64e1d --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide2.txt @@ -0,0 +1,17 @@ +Reactive Manifesto 2.0 + +* Responsive + - responds in a timely manner if at all possible + +* Resilient + - replication, containment, isolation, delegation + - components can fail independently + +* Elastic + - stays responsive under varying load + - increating/decreasing resources + +* Message Driven + - asynchronous communication + - loose coupling, location transparency + - load management, flow-control, back-pressure diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide3.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide3.txt new file mode 100644 index 0000000..84c213e --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide3.txt @@ -0,0 +1,9 @@ +About me + +* coder @ [SoftwareMill](http://www.softwaremill.com) + +* open-source: Supler, MacWire, Envers, ElasticMQ, Veripacks + +* [blog](http://www.warski.org) + +* [twitter](@adamwarski) diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide4.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide4.txt new file mode 100644 index 0000000..f22a281 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slide4.txt @@ -0,0 +1,16 @@ +"basic" Akka: + +* toolkit & runtime for building highly concurrent, distributed, + resilient, message-driven apps on the JVM + +* light-weight actors, communicating via messages + +* errors handling in actors + +And much more: + +* Remoting +* Cluster <- +* Persistance <- +* Streams <- +* HTTP diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideStickers.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideStickers.txt new file mode 100644 index 0000000..4858c03 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideStickers.txt @@ -0,0 +1,20 @@ + ███████╗████████╗██╗ ██████╗██╗ ██╗███████╗██████╗ ███████╗██╗██╗██╗ + ██╔════╝╚══██╔══╝██║██╔════╝██║ ██╔╝██╔════╝██╔══██╗██╔════╝██║██║██║ + ███████╗ ██║ ██║██║ █████╔╝ █████╗ ██████╔╝███████╗██║██║██║ + ╚════██║ ██║ ██║██║ ██╔═██╗ ██╔══╝ ██╔══██╗╚════██║╚═╝╚═╝╚═╝ + ███████║ ██║ ██║╚██████╗██║ ██╗███████╗██║ ██║███████║██╗██╗██╗ + ╚══════╝ ╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚══════╝╚═╝╚═╝╚═╝ + + + *Codebrag*: + + * Daily code review tool + * I'm proud of my code! + * http://www.codebrag.com/ + *ScalaTimes*: + + *Geecon conference* * Weekly Scala Newsletter + * http://www.scalatimes.com/ + * 13-15th May 2015 * @scalatimes + * http://geecon.org + * Warsaw, Poland \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideThanks.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideThanks.txt new file mode 100644 index 0000000..c78fd4a --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideThanks.txt @@ -0,0 +1,26 @@ +.___________. __ __ ___ .__ __. __ ___ +| || | | | / \ | \ | | | |/ / +`---| |----`| |__| | / ^ \ | \| | | ' / + | | | __ | / /_\ \ | . ` | | < + | | | | | | / _____ \ | |\ | | . \ + |__| |__| |__| /__/ \__\ |__| \__| |__|\__\ + + ____ ____ ______ __ __ + \ \ / / / __ \ | | | | + \ \/ / | | | | | | | | + \_ _/ | | | | | | | | + | | | `--' | | `--' | + |__| \______/ \______/ + + +* http://akka.io + +* http://www.reactive-streams.org + +* http://www.warski.org + +* https://github.com/adamw/reactive-akka-pres + +* https://github.com/adamw/reactmq + +* @adamwarski diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideTraits.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideTraits.txt new file mode 100644 index 0000000..b7f7e13 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideTraits.txt @@ -0,0 +1,9 @@ +Reactive manifesto traits: + +* Resilient: clustering, persistence + +* Elastic: streams + +* Message driven: actor communication + +==> Responsive \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideWhatsMore.txt b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideWhatsMore.txt new file mode 100644 index 0000000..f06bf1e --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/resources/slides/slideWhatsMore.txt @@ -0,0 +1,7 @@ +A bit more needed for a full application: + +1. discovering where the receiver runs via Cluster Client + +2. re-connect on error (Futures from Streams) + +3. snapshots (Persistence) \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/BytesPerSecondActor.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/BytesPerSecondActor.scala new file mode 100644 index 0000000..d5fa2ed --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/BytesPerSecondActor.scala @@ -0,0 +1,22 @@ +package com.softwaremill.reactive + +import akka.actor.Actor +import scala.concurrent.duration._ + +class BytesPerSecondActor extends Actor { + override def preStart() = { + import context.dispatcher + context.system.scheduler.schedule(1.second, 1.second, self, Tick) + } + + private var bytes = 0 + + override def receive = { + case Tick => + println(s"Bytes/second: $bytes") + bytes = 0 + case b: Int => bytes += b + } +} + +object Tick \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/ParseLinesStage.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/ParseLinesStage.scala new file mode 100644 index 0000000..aa66735 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/ParseLinesStage.scala @@ -0,0 +1,58 @@ +package com.softwaremill.reactive + +import akka.stream.stage.{Directive, Context, StatefulStage} +import akka.util.ByteString + +import scala.annotation.tailrec + +/** + * From http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html + */ +class ParseLinesStage(separator: String, maximumLineBytes: Int) extends StatefulStage[ByteString, String] { + private val separatorBytes = ByteString(separator) + private val firstSeparatorByte = separatorBytes.head + private var buffer = ByteString.empty + private var nextPossibleMatch = 0 + + def initial = new State { + override def onPush(chunk: ByteString, ctx: Context[String]): Directive = { + buffer ++= chunk + if (buffer.size > maximumLineBytes) { + println("XXX FAIL") + ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " + + s"which is more than $maximumLineBytes without seeing a line terminator")) + } else emit(doParse(Vector.empty).iterator, ctx) + } + + @tailrec + private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = { + val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch) + if (possibleMatchPos == -1) { + // No matching character, we need to accumulate more bytes into the buffer + nextPossibleMatch = buffer.size + parsedLinesSoFar + } else { + if (possibleMatchPos + separatorBytes.size > buffer.size) { + // We have found a possible match (we found the first character of the terminator + // sequence) but we don't have yet enough bytes. We remember the position to + // retry from next time. + nextPossibleMatch = possibleMatchPos + parsedLinesSoFar + } else { + if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) + == separatorBytes) { + // Found a match + val parsedLine = buffer.slice(0, possibleMatchPos).utf8String + buffer = buffer.drop(possibleMatchPos + separatorBytes.size) + nextPossibleMatch -= possibleMatchPos + separatorBytes.size + doParse(parsedLinesSoFar :+ parsedLine) + } else { + nextPossibleMatch += 1 + doParse(parsedLinesSoFar) + } + } + } + + } + } +} diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/LargestDelayActorComplete.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/LargestDelayActorComplete.scala new file mode 100644 index 0000000..5711236 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/LargestDelayActorComplete.scala @@ -0,0 +1,39 @@ +package com.softwaremill.reactive.complete + +import akka.persistence.PersistentActor +import akka.stream.actor.ActorSubscriberMessage.OnNext +import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy} +import com.softwaremill.reactive._ + +class LargestDelayActorComplete extends PersistentActor with ActorSubscriber with Logging { + private var largestDelay: Option[FlightWithDelayPerMile] = None + + override def persistenceId = "flight-actor" + + private var inFlight = 0 + + override protected def requestStrategy = new MaxInFlightRequestStrategy(10) { + override def inFlightInternally = inFlight + } + + def receiveCommand = { + case OnNext(data: FlightData) => + FlightWithDelayPerMile(data).foreach { d => + inFlight += 1 + persistAsync(d) { _ => + processDelayData(d) + inFlight -= 1 + } + } + case LogLargestDelay => logger.info("Largest delay so far: " + largestDelay) + } + + def receiveRecover = { + case d: FlightWithDelayPerMile => processDelayData(d) + } + + def processDelayData(d: FlightWithDelayPerMile): Unit = { + largestDelay = Some((d :: largestDelay.toList).max) + } +} + diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/ReceiverComplete.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/ReceiverComplete.scala new file mode 100644 index 0000000..738fd66 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/ReceiverComplete.scala @@ -0,0 +1,88 @@ +package com.softwaremill.reactive.complete + +import java.net.InetSocketAddress + +import akka.actor.Actor.emptyBehavior +import akka.actor.{Actor, ActorSystem, PoisonPill, Props} +import akka.contrib.pattern.ClusterSingletonManager +import akka.stream.ActorFlowMaterializer +import akka.stream.actor.ActorSubscriber +import akka.stream.scaladsl.{Source, Sink, StreamTcp} +import com.softwaremill.reactive._ +import com.softwaremill.reactive.complete.ReceiverComplete.ReceiverClusterNode +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object ReceiverComplete { + + class Receiver(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging { + + def run(): Unit = { + implicit val mat = ActorFlowMaterializer() + + val largestDelayActor = system.actorOf(Props[LargestDelayActorComplete]) + + logger.info("Receiver: binding to " + receiverAddress) + StreamTcp().bind(receiverAddress).runForeach { conn => + logger.info(s"Receiver: sender connected (${conn.remoteAddress})") + + val receiveSink = conn.flow + .transform(() => new ParseLinesStage("\n", 4000000)) + .filter(_.startsWith("20")) + .map(_.split(",")) + .mapConcat(FlightData(_).toList) + .to(Sink(ActorSubscriber[FlightData](largestDelayActor))) + + receiveSink.runWith(Source.empty) + } + + import system.dispatcher + system.scheduler.schedule(0.seconds, 1.second, largestDelayActor, LogLargestDelay) + } + } + + class ReceiverClusterNode(clusterPort: Int) { + def run(): Unit = { + val conf = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$clusterPort") + .withFallback(ConfigFactory.load("cluster-receiver-template")) + + val system = ActorSystem("receiver", conf) + + system.actorOf(ClusterSingletonManager.props( + singletonProps = Props(classOf[ReceiverNodeActor], clusterPort), + singletonName = "receiver", + terminationMessage = PoisonPill, + role = Some("receiver")), + name = "receiver-manager") + } + } + + class ReceiverNodeActor(clusterPort: Int) extends Actor { + val receiverAddress = new InetSocketAddress("localhost", clusterPort + 10) + + override def preStart() = { + super.preStart() + new Receiver(receiverAddress)(context.system).run() + } + + override def receive = emptyBehavior + } +} + +object ClusteredReceiver1 extends App { + new ReceiverClusterNode(9171).run() +} + +object ClusteredReceiver2 extends App { + new ReceiverClusterNode(9172).run() +} + +object ClusteredReceiver3 extends App { + new ReceiverClusterNode(9173).run() +} + +object SimpleReceiver extends App { + implicit val system = ActorSystem() + new ReceiverComplete.Receiver(new InetSocketAddress("localhost", 9182)).run() +} \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/SenderComplete.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/SenderComplete.scala new file mode 100644 index 0000000..fe16b95 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/complete/SenderComplete.scala @@ -0,0 +1,38 @@ +package com.softwaremill.reactive.complete + +import java.net.InetSocketAddress + +import akka.actor.ActorSystem +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.softwaremill.reactive._ + +import scala.concurrent.duration._ + +object SenderComplete extends App with Logging { + implicit val system = ActorSystem() + val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9181)) + + val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines() + + val linesSource = Source(getLines).map { line => ByteString(line + "\n") } + val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r)) + + val graph = FlowGraph.closed() { implicit b => + import FlowGraph.Implicits._ + + val broadcast = b.add(Broadcast[ByteString](2)) + + val logWindowFlow = Flow[ByteString] + .groupedWithin(10000, 1.seconds) + .map(group => group.map(_.size).foldLeft(0)(_ + _)) + .map(groupSize => logger.info(s"Sent $groupSize bytes")) + + linesSource ~> broadcast ~> serverConnection ~> logCompleteSink + broadcast ~> logWindowFlow ~> Sink.ignore + } + + implicit val mat = ActorFlowMaterializer() + graph.run() +} diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/model.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/model.scala new file mode 100644 index 0000000..f205545 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/model.scala @@ -0,0 +1,29 @@ +package com.softwaremill.reactive + +case class FlightData(from: String, to: String, distanceMiles: Int, delayMinutes: Int) + +object FlightData { + def apply(fields: Array[String]): Option[FlightData] = { + try { + Some(FlightData(fields(16), fields(17), fields(18).toInt, fields(14).toInt)) + } catch { + case _: Exception => None + } + } +} + +case class FlightWithDelayPerMile(flight: FlightData, delayPerMile: Double) extends Comparable[FlightWithDelayPerMile] { + override def compareTo(o: FlightWithDelayPerMile) = delayPerMile.compareTo(o.delayPerMile) +} + +object FlightWithDelayPerMile { + def apply(data: FlightData): Option[FlightWithDelayPerMile] = { + if (data.delayMinutes > 0) { + Some(FlightWithDelayPerMile(data, 60.0d * data.delayMinutes / data.distanceMiles)) + } else None + } +} + +object LogLargestDelay + +object GetReceiverAddress \ No newline at end of file diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/package.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/package.scala new file mode 100644 index 0000000..b2e539d --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/package.scala @@ -0,0 +1,7 @@ +package com.softwaremill + +import com.typesafe.scalalogging.slf4j.StrictLogging + +package object reactive { + type Logging = StrictLogging +} diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step1/ReceiverStep1.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step1/ReceiverStep1.scala new file mode 100644 index 0000000..a8efdf7 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step1/ReceiverStep1.scala @@ -0,0 +1,41 @@ +package com.softwaremill.reactive.step1 + +import java.net.InetSocketAddress + +import akka.actor.ActorSystem +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl._ +import com.softwaremill.reactive._ + +/** + * - flow from the client, transforming, no response + * - *elastic*: delay to see the backpressure + */ +class ReceiverStep1(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging { + + def run(): Unit = { + implicit val mat = ActorFlowMaterializer() + + logger.info("Receiver: binding to " + receiverAddress) + StreamTcp().bind(receiverAddress).runForeach { conn => + logger.info(s"Receiver: sender connected (${conn.remoteAddress})") + + val receiveSink = conn.flow + .transform(() => new ParseLinesStage("\n", 4000000)) + .filter(_.startsWith("20")) + .map(_.split(",")) + .mapConcat(FlightData(_).toList) + .to(Sink.foreach { flightData => + logger.info("Got data: " + flightData) + Thread.sleep(100L) + }) + + Source.empty.to(receiveSink).run() + } + } +} + +object ReceiverStep1 extends App { + implicit val system = ActorSystem() + new ReceiverStep1(new InetSocketAddress("localhost", 9182)).run() +} diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step1/SenderStep1.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step1/SenderStep1.scala new file mode 100644 index 0000000..2f3e668 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step1/SenderStep1.scala @@ -0,0 +1,37 @@ +package com.softwaremill.reactive.step1 + +import java.net.InetSocketAddress + +import akka.actor.{Props, ActorSystem} +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.softwaremill.reactive._ + +/** + * - source: iterator from file (lazy iterator) + * - sink: just log + * - server connnection: req -> resp flow + * - *message driven*: materializer: actors + */ +object SenderStep1 extends App with Logging { + implicit val system = ActorSystem() + val bytesPerSecondActor = system.actorOf(Props[BytesPerSecondActor]) + val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9182)) + + val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines() + + val linesSource = Source(getLines).map { line => + bytesPerSecondActor ! line.length + ByteString(line + "\n") + } + val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r)) + + val flow = linesSource + .via(serverConnection) + .to(logCompleteSink) + + implicit val mat = ActorFlowMaterializer() + flow.run() +} + diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/LargestDelayActorStep2.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/LargestDelayActorStep2.scala new file mode 100644 index 0000000..c49767e --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/LargestDelayActorStep2.scala @@ -0,0 +1,36 @@ +package com.softwaremill.reactive.step2 + +import akka.actor.Actor +import akka.stream.actor.ActorSubscriberMessage.OnNext +import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy} +import com.softwaremill.reactive._ + +/** + * - actor subscriber: must handle OnNext() + * - counting how many messages are in-flight + * - specifying the request strategy + */ +class LargestDelayActorStep2 extends Actor with ActorSubscriber with Logging { + private var largestDelay: Option[FlightWithDelayPerMile] = None + + private var inFlight = 0 + + override protected def requestStrategy = new MaxInFlightRequestStrategy(10) { + override def inFlightInternally = inFlight + } + + def receive = { + case OnNext(data: FlightData) => + FlightWithDelayPerMile(data).foreach { d => + inFlight += 1 + processDelayData(d) + inFlight -= 1 + } + case LogLargestDelay => logger.info("Largest delay so far: " + largestDelay) + } + + def processDelayData(d: FlightWithDelayPerMile): Unit = { + largestDelay = Some((d :: largestDelay.toList).max) + } +} + diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/ReceiverStep2.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/ReceiverStep2.scala new file mode 100644 index 0000000..159f637 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/ReceiverStep2.scala @@ -0,0 +1,47 @@ +package com.softwaremill.reactive.step2 + +import java.net.InetSocketAddress + +import akka.actor.{ActorSystem, Props} +import akka.stream.ActorFlowMaterializer +import akka.stream.actor.ActorSubscriber +import akka.stream.scaladsl.{Source, Sink, StreamTcp} +import com.softwaremill.reactive._ +import com.softwaremill.reactive.complete.LargestDelayActorComplete + +import scala.concurrent.duration._ + +/** + * - sending data to an actor, which processes it further + * - the actor must be reactive + */ +class ReceiverStep2(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging { + + def run(): Unit = { + implicit val mat = ActorFlowMaterializer() + + val largestDelayActor = system.actorOf(Props[LargestDelayActorComplete]) + + logger.info("Receiver: binding to " + receiverAddress) + StreamTcp().bind(receiverAddress).runForeach { conn => + logger.info(s"Receiver: sender connected (${conn.remoteAddress})") + + val receiveSink = conn.flow + .transform(() => new ParseLinesStage("\n", 4000000)) + .filter(_.startsWith("20")) + .map(_.split(",")) + .mapConcat(FlightData(_).toList) + .to(Sink(ActorSubscriber[FlightData](largestDelayActor))) + + Source.empty.to(receiveSink).run() + } + + import system.dispatcher + system.scheduler.schedule(0.seconds, 1.second, largestDelayActor, LogLargestDelay) + } +} + +object ReceiverStep2 extends App { + implicit val system = ActorSystem() + new ReceiverStep2(new InetSocketAddress("localhost", 9182)).run() +} diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/SenderStep2.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/SenderStep2.scala new file mode 100644 index 0000000..2ef466c --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step2/SenderStep2.scala @@ -0,0 +1,42 @@ +package com.softwaremill.reactive.step2 + +import java.net.InetSocketAddress + +import akka.actor.ActorSystem +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.softwaremill.reactive._ + +import scala.concurrent.duration._ + +/** + * - graph: forking to count bytes sent, grouping within 1 second/10k entries, logging + */ +object SenderStep2 extends App with Logging { + implicit val system = ActorSystem() + val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9182)) + + val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines() + + val linesSource = Source(getLines).map { line => ByteString(line + "\n") } + val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r)) + + val graph = FlowGraph.closed() { implicit b => + import FlowGraph.Implicits._ + + val broadcast = b.add(Broadcast[ByteString](2)) + + val logWindowFlow = Flow[ByteString] + .groupedWithin(10000, 1.seconds) + .map(group => group.map(_.size).foldLeft(0)(_ + _)) + .map(groupSize => logger.info(s"Sent $groupSize bytes")) + + linesSource ~> broadcast ~> serverConnection ~> logCompleteSink + broadcast ~> logWindowFlow ~> Sink.ignore + } + + implicit val mat = ActorFlowMaterializer() + graph.run() +} + diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/LargestDelayActorStep3.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/LargestDelayActorStep3.scala new file mode 100644 index 0000000..8c64a41 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/LargestDelayActorStep3.scala @@ -0,0 +1,45 @@ +package com.softwaremill.reactive.step3 + +import akka.persistence.PersistentActor +import akka.stream.actor.ActorSubscriberMessage.OnNext +import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy} +import com.softwaremill.reactive._ + +/** + * - *resilient* + * - adding persistence: message is handled only when persist async completes + * - handler for recovery + * - extending persistent actor + */ +class LargestDelayActorStep3 extends PersistentActor with ActorSubscriber with Logging { + private var largestDelay: Option[FlightWithDelayPerMile] = None + + override def persistenceId = "flight-actor" + + private var inFlight = 0 + + override protected def requestStrategy = new MaxInFlightRequestStrategy(10) { + override def inFlightInternally = inFlight + } + + def receiveCommand = { + case OnNext(data: FlightData) => + FlightWithDelayPerMile(data).foreach { d => + inFlight += 1 + persistAsync(d) { _ => + processDelayData(d) + inFlight -= 1 + } + } + case LogLargestDelay => logger.info("Largest delay so far: " + largestDelay) + } + + def receiveRecover = { + case d: FlightWithDelayPerMile => processDelayData(d) + } + + def processDelayData(d: FlightWithDelayPerMile): Unit = { + largestDelay = Some((d :: largestDelay.toList).max) + } + } + diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/ReceiverComplete.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/ReceiverComplete.scala new file mode 100644 index 0000000..fed03b0 --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/ReceiverComplete.scala @@ -0,0 +1,94 @@ +package com.softwaremill.reactive.step3 + +import java.net.InetSocketAddress + +import akka.actor.Actor.emptyBehavior +import akka.actor.{Actor, ActorSystem, PoisonPill, Props} +import akka.contrib.pattern.ClusterSingletonManager +import akka.stream.ActorFlowMaterializer +import akka.stream.actor.ActorSubscriber +import akka.stream.scaladsl.{Source, Sink, StreamTcp} +import com.softwaremill.reactive._ +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +/** + * - no changes + */ +class ReceiverStep3(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging { + + def run(): Unit = { + implicit val mat = ActorFlowMaterializer() + + val largestDelayActor = system.actorOf(Props[LargestDelayActorStep3]) + + logger.info("Receiver: binding to " + receiverAddress) + StreamTcp().bind(receiverAddress).runForeach { conn => + logger.info(s"Receiver: sender connected (${conn.remoteAddress})") + + val receiveSink = conn.flow + .transform(() => new ParseLinesStage("\n", 4000000)) + .filter(_.startsWith("20")) + .map(_.split(",")) + .mapConcat(FlightData(_).toList) + .to(Sink(ActorSubscriber[FlightData](largestDelayActor))) + + Source.empty.to(receiveSink).run() + } + + import system.dispatcher + system.scheduler.schedule(0.seconds, 1.second, largestDelayActor, LogLargestDelay) + } +} + +object ReceiverStep3 extends App { + implicit val system = ActorSystem() + new ReceiverStep3(new InetSocketAddress("localhost", 9182)).run() +} + +/** + * - *resilient* + * - starting a cluster singleton + * - needs an actor which will be started/stopped + * - actor: starts the receiver + * - three apps to start the nodes + */ +class ReceiverClusterNodeStep3(clusterPort: Int) { + def run(): Unit = { + val conf = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$clusterPort") + .withFallback(ConfigFactory.load("cluster-receiver-template")) + + val system = ActorSystem("receiver", conf) + + system.actorOf(ClusterSingletonManager.props( + singletonProps = Props(classOf[ReceiverNodeActorStep3], clusterPort), + singletonName = "receiver", + terminationMessage = PoisonPill, + role = Some("receiver")), + name = "receiver-manager") + } +} + +class ReceiverNodeActorStep3(clusterPort: Int) extends Actor { + val receiverAddress = new InetSocketAddress("localhost", clusterPort + 10) + + override def preStart() = { + super.preStart() + new ReceiverStep3(receiverAddress)(context.system).run() + } + + override def receive = emptyBehavior +} + +object ClusteredReceiver1Step3 extends App { + new ReceiverClusterNodeStep3(9171).run() +} + +object ClusteredReceiver2Step3 extends App { + new ReceiverClusterNodeStep3(9172).run() +} + +object ClusteredReceiver3Step3 extends App { + new ReceiverClusterNodeStep3(9173).run() +} diff --git a/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/SenderStep3.scala b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/SenderStep3.scala new file mode 100644 index 0000000..4b68e8d --- /dev/null +++ b/2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/src/main/scala/com/softwaremill/reactive/step3/SenderStep3.scala @@ -0,0 +1,42 @@ +package com.softwaremill.reactive.step3 + +import java.net.InetSocketAddress + +import akka.actor.ActorSystem +import akka.stream.ActorFlowMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.softwaremill.reactive._ + +import scala.concurrent.duration._ + +/** + * - no changes + */ +object SenderStep3 extends App with Logging { + implicit val system = ActorSystem() + val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9181)) + + val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines() + + val linesSource = Source(getLines).map { line => ByteString(line + "\n") } + val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r)) + + val graph = FlowGraph.closed() { implicit b => + import FlowGraph.Implicits._ + + val broadcast = b.add(Broadcast[ByteString](2)) + + val logWindowFlow = Flow[ByteString] + .groupedWithin(10000, 1.seconds) + .map(group => group.map(_.size).foldLeft(0)(_ + _)) + .map(groupSize => logger.info(s"Sent $groupSize bytes")) + + linesSource ~> broadcast ~> serverConnection ~> logCompleteSink + broadcast ~> logWindowFlow ~> Sink.ignore + } + + implicit val mat = ActorFlowMaterializer() + graph.run() +} +