mirror of
https://github.com/ysoftdevs/th.git
synced 2026-01-11 14:20:24 +01:00
Added Adam Warski, 1st aniversary talk
This commit is contained in:
18
2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/.gitignore
vendored
Normal file
18
2015_04_16_Adam_Warski_Implementing_the_Reactive_Manifesto_with_Akka/.gitignore
vendored
Normal file
@@ -0,0 +1,18 @@
|
||||
*.class
|
||||
*.log
|
||||
|
||||
# sbt specific
|
||||
.cache/
|
||||
.history/
|
||||
.lib/
|
||||
dist/*
|
||||
target/
|
||||
lib_managed/
|
||||
src_managed/
|
||||
project/boot/
|
||||
project/plugins/project/
|
||||
|
||||
# Scala-IDE specific
|
||||
.scala_dependencies
|
||||
.worksheet
|
||||
.idea
|
||||
@@ -0,0 +1,202 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "{}"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright {yyyy} {name of copyright owner}
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
Implementing the Reactive Manifesto with Akka
|
||||
---
|
||||
|
||||
Source obtained from [github.com/adamw/reactive-akka-pres](https://github.com/adamw/reactive-akka-pres/).
|
||||
|
||||
Dataset: http://stat-computing.org/dataexpo/2009/the-data.html
|
||||
@@ -0,0 +1 @@
|
||||
sbt.version=0.13.7
|
||||
@@ -0,0 +1,24 @@
|
||||
organization := "com.softwaremill"
|
||||
|
||||
name := "reactive-akka-pres"
|
||||
|
||||
version := "0.1-SNAPSHOT"
|
||||
|
||||
scalaVersion := "2.11.6"
|
||||
|
||||
val akkaVersion = "2.3.9"
|
||||
|
||||
libraryDependencies ++= Seq(
|
||||
// akka
|
||||
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
|
||||
"com.typesafe.akka" %% "akka-persistence-experimental" % akkaVersion,
|
||||
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
|
||||
"com.typesafe.akka" %% "akka-contrib" % akkaVersion,
|
||||
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M5",
|
||||
// util
|
||||
"com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2",
|
||||
"ch.qos.logback" % "logback-classic" % "1.1.3",
|
||||
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
|
||||
"joda-time" % "joda-time" % "2.7",
|
||||
"org.joda" % "joda-convert" % "1.7"
|
||||
)
|
||||
@@ -0,0 +1,12 @@
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
|
||||
remote.netty.tcp.port = 0
|
||||
remote.netty.tcp.hostname = "127.0.0.1"
|
||||
}
|
||||
|
||||
cluster.client.initial-contact-points = [
|
||||
"akka.tcp://receiver@127.0.0.1:9171",
|
||||
"akka.tcp://receiver@127.0.0.1:9172",
|
||||
"akka.tcp://receiver@127.0.0.1:9173"
|
||||
]
|
||||
@@ -0,0 +1,19 @@
|
||||
akka {
|
||||
actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
|
||||
remote.netty.tcp.port = 0 // should be overriden
|
||||
remote.netty.tcp.hostname = "127.0.0.1"
|
||||
|
||||
cluster {
|
||||
seed-nodes = [
|
||||
"akka.tcp://receiver@127.0.0.1:9171",
|
||||
"akka.tcp://receiver@127.0.0.1:9172",
|
||||
"akka.tcp://receiver@127.0.0.1:9173"
|
||||
]
|
||||
auto-down-unreachable-after = 10s
|
||||
roles = [ "receiver" ]
|
||||
role.receiver.min-nr-of-members = 2
|
||||
}
|
||||
|
||||
extensions = [ "akka.contrib.pattern.ClusterReceptionistExtension" ]
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
██████╗ ███████╗ █████╗ ██████╗████████╗██╗██╗ ██╗███████╗
|
||||
██╔══██╗██╔════╝██╔══██╗██╔════╝╚══██╔══╝██║██║ ██║██╔════╝
|
||||
██████╔╝█████╗ ███████║██║ ██║ ██║██║ ██║█████╗
|
||||
██╔══██╗██╔══╝ ██╔══██║██║ ██║ ██║╚██╗ ██╔╝██╔══╝
|
||||
██║ ██║███████╗██║ ██║╚██████╗ ██║ ██║ ╚████╔╝ ███████╗
|
||||
╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═══╝ ╚══════╝
|
||||
|
||||
██╗███╗ ███╗██████╗ ██╗ ██╗ ██╗ ██╗ █████╗ ██╗ ██╗██╗ ██╗ █████╗
|
||||
██║████╗ ████║██╔══██╗██║ ██║ ██║ ██╔╝██╔══██╗██║ ██╔╝██║ ██╔╝██╔══██╗
|
||||
██║██╔████╔██║██████╔╝██║ ██║ █╗ ██║ ██╔╝ ███████║█████╔╝ █████╔╝ ███████║
|
||||
██║██║╚██╔╝██║██╔═══╝ ██║ ██║███╗██║ ██╔╝ ██╔══██║██╔═██╗ ██╔═██╗ ██╔══██║
|
||||
██║██║ ╚═╝ ██║██║ ███████╗ ╚███╔███╔╝██╔╝ ██║ ██║██║ ██╗██║ ██╗██║ ██║
|
||||
╚═╝╚═╝ ╚═╝╚═╝ ╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝
|
||||
_____ _ _ _ _ _ _
|
||||
| _ |_| |___ _____ | | | |___ ___ ___| |_|_|
|
||||
| | . | .'| | | | | | .'| _|_ -| '_| |
|
||||
|__|__|___|__,|_|_|_| |_____|__,|_| |___|_,_|_|
|
||||
|
||||
_____ _ _ _ _
|
||||
|_ _|__ ___| |__ _ __ ___ | | ___ __ _ _ _| | | | ___ _ _ _ __
|
||||
| |/ _ \/ __| '_ \| '_ \ / _ \| |/ _ \ / _` | | | | |_| |/ _ \| | | | '__|
|
||||
| | __/ (__| | | | | | | (_) | | (_) | (_| | |_| | _ | (_) | |_| | |
|
||||
|_|\___|\___|_| |_|_| |_|\___/|_|\___/ \__, |\__, |_| |_|\___/ \__,_|_|
|
||||
|___/ |___/
|
||||
@@ -0,0 +1,17 @@
|
||||
Reactive Manifesto 2.0
|
||||
|
||||
* Responsive
|
||||
- responds in a timely manner if at all possible
|
||||
|
||||
* Resilient
|
||||
- replication, containment, isolation, delegation
|
||||
- components can fail independently
|
||||
|
||||
* Elastic
|
||||
- stays responsive under varying load
|
||||
- increating/decreasing resources
|
||||
|
||||
* Message Driven
|
||||
- asynchronous communication
|
||||
- loose coupling, location transparency
|
||||
- load management, flow-control, back-pressure
|
||||
@@ -0,0 +1,9 @@
|
||||
About me
|
||||
|
||||
* coder @ [SoftwareMill](http://www.softwaremill.com)
|
||||
|
||||
* open-source: Supler, MacWire, Envers, ElasticMQ, Veripacks
|
||||
|
||||
* [blog](http://www.warski.org)
|
||||
|
||||
* [twitter](@adamwarski)
|
||||
@@ -0,0 +1,16 @@
|
||||
"basic" Akka:
|
||||
|
||||
* toolkit & runtime for building highly concurrent, distributed,
|
||||
resilient, message-driven apps on the JVM
|
||||
|
||||
* light-weight actors, communicating via messages
|
||||
|
||||
* errors handling in actors
|
||||
|
||||
And much more:
|
||||
|
||||
* Remoting
|
||||
* Cluster <-
|
||||
* Persistance <-
|
||||
* Streams <-
|
||||
* HTTP
|
||||
@@ -0,0 +1,20 @@
|
||||
███████╗████████╗██╗ ██████╗██╗ ██╗███████╗██████╗ ███████╗██╗██╗██╗
|
||||
██╔════╝╚══██╔══╝██║██╔════╝██║ ██╔╝██╔════╝██╔══██╗██╔════╝██║██║██║
|
||||
███████╗ ██║ ██║██║ █████╔╝ █████╗ ██████╔╝███████╗██║██║██║
|
||||
╚════██║ ██║ ██║██║ ██╔═██╗ ██╔══╝ ██╔══██╗╚════██║╚═╝╚═╝╚═╝
|
||||
███████║ ██║ ██║╚██████╗██║ ██╗███████╗██║ ██║███████║██╗██╗██╗
|
||||
╚══════╝ ╚═╝ ╚═╝ ╚═════╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚══════╝╚═╝╚═╝╚═╝
|
||||
|
||||
|
||||
*Codebrag*:
|
||||
|
||||
* Daily code review tool
|
||||
* I'm proud of my code!
|
||||
* http://www.codebrag.com/
|
||||
*ScalaTimes*:
|
||||
|
||||
*Geecon conference* * Weekly Scala Newsletter
|
||||
* http://www.scalatimes.com/
|
||||
* 13-15th May 2015 * @scalatimes
|
||||
* http://geecon.org
|
||||
* Warsaw, Poland
|
||||
@@ -0,0 +1,26 @@
|
||||
.___________. __ __ ___ .__ __. __ ___
|
||||
| || | | | / \ | \ | | | |/ /
|
||||
`---| |----`| |__| | / ^ \ | \| | | ' /
|
||||
| | | __ | / /_\ \ | . ` | | <
|
||||
| | | | | | / _____ \ | |\ | | . \
|
||||
|__| |__| |__| /__/ \__\ |__| \__| |__|\__\
|
||||
|
||||
____ ____ ______ __ __
|
||||
\ \ / / / __ \ | | | |
|
||||
\ \/ / | | | | | | | |
|
||||
\_ _/ | | | | | | | |
|
||||
| | | `--' | | `--' |
|
||||
|__| \______/ \______/
|
||||
|
||||
|
||||
* http://akka.io
|
||||
|
||||
* http://www.reactive-streams.org
|
||||
|
||||
* http://www.warski.org
|
||||
|
||||
* https://github.com/adamw/reactive-akka-pres
|
||||
|
||||
* https://github.com/adamw/reactmq
|
||||
|
||||
* @adamwarski
|
||||
@@ -0,0 +1,9 @@
|
||||
Reactive manifesto traits:
|
||||
|
||||
* Resilient: clustering, persistence
|
||||
|
||||
* Elastic: streams
|
||||
|
||||
* Message driven: actor communication
|
||||
|
||||
==> Responsive
|
||||
@@ -0,0 +1,7 @@
|
||||
A bit more needed for a full application:
|
||||
|
||||
1. discovering where the receiver runs via Cluster Client
|
||||
|
||||
2. re-connect on error (Futures from Streams)
|
||||
|
||||
3. snapshots (Persistence)
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.softwaremill.reactive
|
||||
|
||||
import akka.actor.Actor
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class BytesPerSecondActor extends Actor {
|
||||
override def preStart() = {
|
||||
import context.dispatcher
|
||||
context.system.scheduler.schedule(1.second, 1.second, self, Tick)
|
||||
}
|
||||
|
||||
private var bytes = 0
|
||||
|
||||
override def receive = {
|
||||
case Tick =>
|
||||
println(s"Bytes/second: $bytes")
|
||||
bytes = 0
|
||||
case b: Int => bytes += b
|
||||
}
|
||||
}
|
||||
|
||||
object Tick
|
||||
@@ -0,0 +1,58 @@
|
||||
package com.softwaremill.reactive
|
||||
|
||||
import akka.stream.stage.{Directive, Context, StatefulStage}
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
/**
|
||||
* From http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html
|
||||
*/
|
||||
class ParseLinesStage(separator: String, maximumLineBytes: Int) extends StatefulStage[ByteString, String] {
|
||||
private val separatorBytes = ByteString(separator)
|
||||
private val firstSeparatorByte = separatorBytes.head
|
||||
private var buffer = ByteString.empty
|
||||
private var nextPossibleMatch = 0
|
||||
|
||||
def initial = new State {
|
||||
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
|
||||
buffer ++= chunk
|
||||
if (buffer.size > maximumLineBytes) {
|
||||
println("XXX FAIL")
|
||||
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
|
||||
s"which is more than $maximumLineBytes without seeing a line terminator"))
|
||||
} else emit(doParse(Vector.empty).iterator, ctx)
|
||||
}
|
||||
|
||||
@tailrec
|
||||
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
|
||||
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
|
||||
if (possibleMatchPos == -1) {
|
||||
// No matching character, we need to accumulate more bytes into the buffer
|
||||
nextPossibleMatch = buffer.size
|
||||
parsedLinesSoFar
|
||||
} else {
|
||||
if (possibleMatchPos + separatorBytes.size > buffer.size) {
|
||||
// We have found a possible match (we found the first character of the terminator
|
||||
// sequence) but we don't have yet enough bytes. We remember the position to
|
||||
// retry from next time.
|
||||
nextPossibleMatch = possibleMatchPos
|
||||
parsedLinesSoFar
|
||||
} else {
|
||||
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
|
||||
== separatorBytes) {
|
||||
// Found a match
|
||||
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
|
||||
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
|
||||
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
|
||||
doParse(parsedLinesSoFar :+ parsedLine)
|
||||
} else {
|
||||
nextPossibleMatch += 1
|
||||
doParse(parsedLinesSoFar)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.softwaremill.reactive.complete
|
||||
|
||||
import akka.persistence.PersistentActor
|
||||
import akka.stream.actor.ActorSubscriberMessage.OnNext
|
||||
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
class LargestDelayActorComplete extends PersistentActor with ActorSubscriber with Logging {
|
||||
private var largestDelay: Option[FlightWithDelayPerMile] = None
|
||||
|
||||
override def persistenceId = "flight-actor"
|
||||
|
||||
private var inFlight = 0
|
||||
|
||||
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
|
||||
override def inFlightInternally = inFlight
|
||||
}
|
||||
|
||||
def receiveCommand = {
|
||||
case OnNext(data: FlightData) =>
|
||||
FlightWithDelayPerMile(data).foreach { d =>
|
||||
inFlight += 1
|
||||
persistAsync(d) { _ =>
|
||||
processDelayData(d)
|
||||
inFlight -= 1
|
||||
}
|
||||
}
|
||||
case LogLargestDelay => logger.info("Largest delay so far: " + largestDelay)
|
||||
}
|
||||
|
||||
def receiveRecover = {
|
||||
case d: FlightWithDelayPerMile => processDelayData(d)
|
||||
}
|
||||
|
||||
def processDelayData(d: FlightWithDelayPerMile): Unit = {
|
||||
largestDelay = Some((d :: largestDelay.toList).max)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
package com.softwaremill.reactive.complete
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.Actor.emptyBehavior
|
||||
import akka.actor.{Actor, ActorSystem, PoisonPill, Props}
|
||||
import akka.contrib.pattern.ClusterSingletonManager
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.actor.ActorSubscriber
|
||||
import akka.stream.scaladsl.{Source, Sink, StreamTcp}
|
||||
import com.softwaremill.reactive._
|
||||
import com.softwaremill.reactive.complete.ReceiverComplete.ReceiverClusterNode
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object ReceiverComplete {
|
||||
|
||||
class Receiver(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
|
||||
|
||||
def run(): Unit = {
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
|
||||
val largestDelayActor = system.actorOf(Props[LargestDelayActorComplete])
|
||||
|
||||
logger.info("Receiver: binding to " + receiverAddress)
|
||||
StreamTcp().bind(receiverAddress).runForeach { conn =>
|
||||
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")
|
||||
|
||||
val receiveSink = conn.flow
|
||||
.transform(() => new ParseLinesStage("\n", 4000000))
|
||||
.filter(_.startsWith("20"))
|
||||
.map(_.split(","))
|
||||
.mapConcat(FlightData(_).toList)
|
||||
.to(Sink(ActorSubscriber[FlightData](largestDelayActor)))
|
||||
|
||||
receiveSink.runWith(Source.empty)
|
||||
}
|
||||
|
||||
import system.dispatcher
|
||||
system.scheduler.schedule(0.seconds, 1.second, largestDelayActor, LogLargestDelay)
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiverClusterNode(clusterPort: Int) {
|
||||
def run(): Unit = {
|
||||
val conf = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$clusterPort")
|
||||
.withFallback(ConfigFactory.load("cluster-receiver-template"))
|
||||
|
||||
val system = ActorSystem("receiver", conf)
|
||||
|
||||
system.actorOf(ClusterSingletonManager.props(
|
||||
singletonProps = Props(classOf[ReceiverNodeActor], clusterPort),
|
||||
singletonName = "receiver",
|
||||
terminationMessage = PoisonPill,
|
||||
role = Some("receiver")),
|
||||
name = "receiver-manager")
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiverNodeActor(clusterPort: Int) extends Actor {
|
||||
val receiverAddress = new InetSocketAddress("localhost", clusterPort + 10)
|
||||
|
||||
override def preStart() = {
|
||||
super.preStart()
|
||||
new Receiver(receiverAddress)(context.system).run()
|
||||
}
|
||||
|
||||
override def receive = emptyBehavior
|
||||
}
|
||||
}
|
||||
|
||||
object ClusteredReceiver1 extends App {
|
||||
new ReceiverClusterNode(9171).run()
|
||||
}
|
||||
|
||||
object ClusteredReceiver2 extends App {
|
||||
new ReceiverClusterNode(9172).run()
|
||||
}
|
||||
|
||||
object ClusteredReceiver3 extends App {
|
||||
new ReceiverClusterNode(9173).run()
|
||||
}
|
||||
|
||||
object SimpleReceiver extends App {
|
||||
implicit val system = ActorSystem()
|
||||
new ReceiverComplete.Receiver(new InetSocketAddress("localhost", 9182)).run()
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.softwaremill.reactive.complete
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.util.ByteString
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object SenderComplete extends App with Logging {
|
||||
implicit val system = ActorSystem()
|
||||
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9181))
|
||||
|
||||
val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()
|
||||
|
||||
val linesSource = Source(getLines).map { line => ByteString(line + "\n") }
|
||||
val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r))
|
||||
|
||||
val graph = FlowGraph.closed() { implicit b =>
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
val broadcast = b.add(Broadcast[ByteString](2))
|
||||
|
||||
val logWindowFlow = Flow[ByteString]
|
||||
.groupedWithin(10000, 1.seconds)
|
||||
.map(group => group.map(_.size).foldLeft(0)(_ + _))
|
||||
.map(groupSize => logger.info(s"Sent $groupSize bytes"))
|
||||
|
||||
linesSource ~> broadcast ~> serverConnection ~> logCompleteSink
|
||||
broadcast ~> logWindowFlow ~> Sink.ignore
|
||||
}
|
||||
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
graph.run()
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package com.softwaremill.reactive
|
||||
|
||||
case class FlightData(from: String, to: String, distanceMiles: Int, delayMinutes: Int)
|
||||
|
||||
object FlightData {
|
||||
def apply(fields: Array[String]): Option[FlightData] = {
|
||||
try {
|
||||
Some(FlightData(fields(16), fields(17), fields(18).toInt, fields(14).toInt))
|
||||
} catch {
|
||||
case _: Exception => None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class FlightWithDelayPerMile(flight: FlightData, delayPerMile: Double) extends Comparable[FlightWithDelayPerMile] {
|
||||
override def compareTo(o: FlightWithDelayPerMile) = delayPerMile.compareTo(o.delayPerMile)
|
||||
}
|
||||
|
||||
object FlightWithDelayPerMile {
|
||||
def apply(data: FlightData): Option[FlightWithDelayPerMile] = {
|
||||
if (data.delayMinutes > 0) {
|
||||
Some(FlightWithDelayPerMile(data, 60.0d * data.delayMinutes / data.distanceMiles))
|
||||
} else None
|
||||
}
|
||||
}
|
||||
|
||||
object LogLargestDelay
|
||||
|
||||
object GetReceiverAddress
|
||||
@@ -0,0 +1,7 @@
|
||||
package com.softwaremill
|
||||
|
||||
import com.typesafe.scalalogging.slf4j.StrictLogging
|
||||
|
||||
package object reactive {
|
||||
type Logging = StrictLogging
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.softwaremill.reactive.step1
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
/**
|
||||
* - flow from the client, transforming, no response
|
||||
* - *elastic*: delay to see the backpressure
|
||||
*/
|
||||
class ReceiverStep1(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
|
||||
|
||||
def run(): Unit = {
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
|
||||
logger.info("Receiver: binding to " + receiverAddress)
|
||||
StreamTcp().bind(receiverAddress).runForeach { conn =>
|
||||
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")
|
||||
|
||||
val receiveSink = conn.flow
|
||||
.transform(() => new ParseLinesStage("\n", 4000000))
|
||||
.filter(_.startsWith("20"))
|
||||
.map(_.split(","))
|
||||
.mapConcat(FlightData(_).toList)
|
||||
.to(Sink.foreach { flightData =>
|
||||
logger.info("Got data: " + flightData)
|
||||
Thread.sleep(100L)
|
||||
})
|
||||
|
||||
Source.empty.to(receiveSink).run()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object ReceiverStep1 extends App {
|
||||
implicit val system = ActorSystem()
|
||||
new ReceiverStep1(new InetSocketAddress("localhost", 9182)).run()
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.softwaremill.reactive.step1
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.{Props, ActorSystem}
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.util.ByteString
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
/**
|
||||
* - source: iterator from file (lazy iterator)
|
||||
* - sink: just log
|
||||
* - server connnection: req -> resp flow
|
||||
* - *message driven*: materializer: actors
|
||||
*/
|
||||
object SenderStep1 extends App with Logging {
|
||||
implicit val system = ActorSystem()
|
||||
val bytesPerSecondActor = system.actorOf(Props[BytesPerSecondActor])
|
||||
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9182))
|
||||
|
||||
val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()
|
||||
|
||||
val linesSource = Source(getLines).map { line =>
|
||||
bytesPerSecondActor ! line.length
|
||||
ByteString(line + "\n")
|
||||
}
|
||||
val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r))
|
||||
|
||||
val flow = linesSource
|
||||
.via(serverConnection)
|
||||
.to(logCompleteSink)
|
||||
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
flow.run()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.softwaremill.reactive.step2
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.stream.actor.ActorSubscriberMessage.OnNext
|
||||
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
/**
|
||||
* - actor subscriber: must handle OnNext()
|
||||
* - counting how many messages are in-flight
|
||||
* - specifying the request strategy
|
||||
*/
|
||||
class LargestDelayActorStep2 extends Actor with ActorSubscriber with Logging {
|
||||
private var largestDelay: Option[FlightWithDelayPerMile] = None
|
||||
|
||||
private var inFlight = 0
|
||||
|
||||
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
|
||||
override def inFlightInternally = inFlight
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case OnNext(data: FlightData) =>
|
||||
FlightWithDelayPerMile(data).foreach { d =>
|
||||
inFlight += 1
|
||||
processDelayData(d)
|
||||
inFlight -= 1
|
||||
}
|
||||
case LogLargestDelay => logger.info("Largest delay so far: " + largestDelay)
|
||||
}
|
||||
|
||||
def processDelayData(d: FlightWithDelayPerMile): Unit = {
|
||||
largestDelay = Some((d :: largestDelay.toList).max)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.softwaremill.reactive.step2
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.{ActorSystem, Props}
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.actor.ActorSubscriber
|
||||
import akka.stream.scaladsl.{Source, Sink, StreamTcp}
|
||||
import com.softwaremill.reactive._
|
||||
import com.softwaremill.reactive.complete.LargestDelayActorComplete
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* - sending data to an actor, which processes it further
|
||||
* - the actor must be reactive
|
||||
*/
|
||||
class ReceiverStep2(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
|
||||
|
||||
def run(): Unit = {
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
|
||||
val largestDelayActor = system.actorOf(Props[LargestDelayActorComplete])
|
||||
|
||||
logger.info("Receiver: binding to " + receiverAddress)
|
||||
StreamTcp().bind(receiverAddress).runForeach { conn =>
|
||||
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")
|
||||
|
||||
val receiveSink = conn.flow
|
||||
.transform(() => new ParseLinesStage("\n", 4000000))
|
||||
.filter(_.startsWith("20"))
|
||||
.map(_.split(","))
|
||||
.mapConcat(FlightData(_).toList)
|
||||
.to(Sink(ActorSubscriber[FlightData](largestDelayActor)))
|
||||
|
||||
Source.empty.to(receiveSink).run()
|
||||
}
|
||||
|
||||
import system.dispatcher
|
||||
system.scheduler.schedule(0.seconds, 1.second, largestDelayActor, LogLargestDelay)
|
||||
}
|
||||
}
|
||||
|
||||
object ReceiverStep2 extends App {
|
||||
implicit val system = ActorSystem()
|
||||
new ReceiverStep2(new InetSocketAddress("localhost", 9182)).run()
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.softwaremill.reactive.step2
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.util.ByteString
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* - graph: forking to count bytes sent, grouping within 1 second/10k entries, logging
|
||||
*/
|
||||
object SenderStep2 extends App with Logging {
|
||||
implicit val system = ActorSystem()
|
||||
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9182))
|
||||
|
||||
val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()
|
||||
|
||||
val linesSource = Source(getLines).map { line => ByteString(line + "\n") }
|
||||
val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r))
|
||||
|
||||
val graph = FlowGraph.closed() { implicit b =>
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
val broadcast = b.add(Broadcast[ByteString](2))
|
||||
|
||||
val logWindowFlow = Flow[ByteString]
|
||||
.groupedWithin(10000, 1.seconds)
|
||||
.map(group => group.map(_.size).foldLeft(0)(_ + _))
|
||||
.map(groupSize => logger.info(s"Sent $groupSize bytes"))
|
||||
|
||||
linesSource ~> broadcast ~> serverConnection ~> logCompleteSink
|
||||
broadcast ~> logWindowFlow ~> Sink.ignore
|
||||
}
|
||||
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
graph.run()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.softwaremill.reactive.step3
|
||||
|
||||
import akka.persistence.PersistentActor
|
||||
import akka.stream.actor.ActorSubscriberMessage.OnNext
|
||||
import akka.stream.actor.{ActorSubscriber, MaxInFlightRequestStrategy}
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
/**
|
||||
* - *resilient*
|
||||
* - adding persistence: message is handled only when persist async completes
|
||||
* - handler for recovery
|
||||
* - extending persistent actor
|
||||
*/
|
||||
class LargestDelayActorStep3 extends PersistentActor with ActorSubscriber with Logging {
|
||||
private var largestDelay: Option[FlightWithDelayPerMile] = None
|
||||
|
||||
override def persistenceId = "flight-actor"
|
||||
|
||||
private var inFlight = 0
|
||||
|
||||
override protected def requestStrategy = new MaxInFlightRequestStrategy(10) {
|
||||
override def inFlightInternally = inFlight
|
||||
}
|
||||
|
||||
def receiveCommand = {
|
||||
case OnNext(data: FlightData) =>
|
||||
FlightWithDelayPerMile(data).foreach { d =>
|
||||
inFlight += 1
|
||||
persistAsync(d) { _ =>
|
||||
processDelayData(d)
|
||||
inFlight -= 1
|
||||
}
|
||||
}
|
||||
case LogLargestDelay => logger.info("Largest delay so far: " + largestDelay)
|
||||
}
|
||||
|
||||
def receiveRecover = {
|
||||
case d: FlightWithDelayPerMile => processDelayData(d)
|
||||
}
|
||||
|
||||
def processDelayData(d: FlightWithDelayPerMile): Unit = {
|
||||
largestDelay = Some((d :: largestDelay.toList).max)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.softwaremill.reactive.step3
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.Actor.emptyBehavior
|
||||
import akka.actor.{Actor, ActorSystem, PoisonPill, Props}
|
||||
import akka.contrib.pattern.ClusterSingletonManager
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.actor.ActorSubscriber
|
||||
import akka.stream.scaladsl.{Source, Sink, StreamTcp}
|
||||
import com.softwaremill.reactive._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* - no changes
|
||||
*/
|
||||
class ReceiverStep3(receiverAddress: InetSocketAddress)(implicit val system: ActorSystem) extends Logging {
|
||||
|
||||
def run(): Unit = {
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
|
||||
val largestDelayActor = system.actorOf(Props[LargestDelayActorStep3])
|
||||
|
||||
logger.info("Receiver: binding to " + receiverAddress)
|
||||
StreamTcp().bind(receiverAddress).runForeach { conn =>
|
||||
logger.info(s"Receiver: sender connected (${conn.remoteAddress})")
|
||||
|
||||
val receiveSink = conn.flow
|
||||
.transform(() => new ParseLinesStage("\n", 4000000))
|
||||
.filter(_.startsWith("20"))
|
||||
.map(_.split(","))
|
||||
.mapConcat(FlightData(_).toList)
|
||||
.to(Sink(ActorSubscriber[FlightData](largestDelayActor)))
|
||||
|
||||
Source.empty.to(receiveSink).run()
|
||||
}
|
||||
|
||||
import system.dispatcher
|
||||
system.scheduler.schedule(0.seconds, 1.second, largestDelayActor, LogLargestDelay)
|
||||
}
|
||||
}
|
||||
|
||||
object ReceiverStep3 extends App {
|
||||
implicit val system = ActorSystem()
|
||||
new ReceiverStep3(new InetSocketAddress("localhost", 9182)).run()
|
||||
}
|
||||
|
||||
/**
|
||||
* - *resilient*
|
||||
* - starting a cluster singleton
|
||||
* - needs an actor which will be started/stopped
|
||||
* - actor: starts the receiver
|
||||
* - three apps to start the nodes
|
||||
*/
|
||||
class ReceiverClusterNodeStep3(clusterPort: Int) {
|
||||
def run(): Unit = {
|
||||
val conf = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$clusterPort")
|
||||
.withFallback(ConfigFactory.load("cluster-receiver-template"))
|
||||
|
||||
val system = ActorSystem("receiver", conf)
|
||||
|
||||
system.actorOf(ClusterSingletonManager.props(
|
||||
singletonProps = Props(classOf[ReceiverNodeActorStep3], clusterPort),
|
||||
singletonName = "receiver",
|
||||
terminationMessage = PoisonPill,
|
||||
role = Some("receiver")),
|
||||
name = "receiver-manager")
|
||||
}
|
||||
}
|
||||
|
||||
class ReceiverNodeActorStep3(clusterPort: Int) extends Actor {
|
||||
val receiverAddress = new InetSocketAddress("localhost", clusterPort + 10)
|
||||
|
||||
override def preStart() = {
|
||||
super.preStart()
|
||||
new ReceiverStep3(receiverAddress)(context.system).run()
|
||||
}
|
||||
|
||||
override def receive = emptyBehavior
|
||||
}
|
||||
|
||||
object ClusteredReceiver1Step3 extends App {
|
||||
new ReceiverClusterNodeStep3(9171).run()
|
||||
}
|
||||
|
||||
object ClusteredReceiver2Step3 extends App {
|
||||
new ReceiverClusterNodeStep3(9172).run()
|
||||
}
|
||||
|
||||
object ClusteredReceiver3Step3 extends App {
|
||||
new ReceiverClusterNodeStep3(9173).run()
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.softwaremill.reactive.step3
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorFlowMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.util.ByteString
|
||||
import com.softwaremill.reactive._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* - no changes
|
||||
*/
|
||||
object SenderStep3 extends App with Logging {
|
||||
implicit val system = ActorSystem()
|
||||
val serverConnection = StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 9181))
|
||||
|
||||
val getLines = () => scala.io.Source.fromFile("/Users/adamw/projects/reactive-akka-pres/data/2008.csv").getLines()
|
||||
|
||||
val linesSource = Source(getLines).map { line => ByteString(line + "\n") }
|
||||
val logCompleteSink = Sink.onComplete(r => logger.info("Completed with: " + r))
|
||||
|
||||
val graph = FlowGraph.closed() { implicit b =>
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
val broadcast = b.add(Broadcast[ByteString](2))
|
||||
|
||||
val logWindowFlow = Flow[ByteString]
|
||||
.groupedWithin(10000, 1.seconds)
|
||||
.map(group => group.map(_.size).foldLeft(0)(_ + _))
|
||||
.map(groupSize => logger.info(s"Sent $groupSize bytes"))
|
||||
|
||||
linesSource ~> broadcast ~> serverConnection ~> logCompleteSink
|
||||
broadcast ~> logWindowFlow ~> Sink.ignore
|
||||
}
|
||||
|
||||
implicit val mat = ActorFlowMaterializer()
|
||||
graph.run()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user