ä»æ¥ã¯ããã°ãããæ¬ã Reactive Design Patterns ãã§åãäžããããŠãããããã¯ã®1ã€ã«æ»ããŸãã Akka Streamsãšã¹ããªãŒãã³ã°ããŒã¿å šè¬ã«ã€ããŠã話ããŸã-ããŒã©ã³ãã»ã¯ãŒã³ã®æ¬ã§ã¯ã10ç« ãš15ç« ãã17ç« ããããã®åé¡ã«åœãŠãããŠããŸãã
Jetã¹ããªãŒã ã¯ãããŒã¿ãéåæçã«ã¹ããªãŒãã³ã°ããæšæºçãªæ¹æ³ã§ãã ãããã¯
java.util.concurrent.Flow
ã€ã³ã¿ãŒãã§ãŒã¹ãšããŠJava 9ã«å«ãŸããŠããŸããããçŸåšã§ã¯ããŸããŸãªã¢ããªã±ãŒã·ã§ã³ã§ã¹ããªãŒãã³ã°ã³ã³ããŒãã³ããäœæããããã®å®éã®åœã®æ©äººã«ãªãã€ã€ãããŸãããã®é 眮ã¯ä»åŸãç¶ãã§ãããã ãªã¢ã¯ãã£ãã¹ããªãŒã ã¯ããŸãã«ãæšæºã§ãããããã ãã§ã¯äŸ¡å€ããªãããšã«æ³šæããŠãã ããã å®éã«ã¯ããã®æšæºã®1ã€ãŸãã¯å¥ã®ç¹å®ã®å®è£ ã䜿çšãããŸããä»æ¥ã¯ãAkka Streamsã«ã€ããŠèª¬æããŸããAkkaStreamsã¯ããžã§ããã¹ããªãŒã ã®æåã®å®è£ ã®1ã€ã§ãã
ã³ã³ããã¹ã
å žåçãªã¹ããªãŒã åŠçãã€ãã©ã€ã³ã¯ããã€ãã®ã¹ãããã§æ§æãããåæ å ±ã¯æ¬¡ã®ã¹ãããã«ïŒã€ãŸãéé ã§ïŒéä¿¡ãããŸãã ãããã£ãŠã2ã€ã®é£æ¥ããæé ãå®è¡ããŠã芪ããµãã©ã€ã€ã次ã®ã¹ããããããŒã¿ã³ã³ã·ã¥ãŒããšèŠãªããšããµãã©ã€ã€ã¯ã³ã³ã·ã¥ãŒããããé ãããŸãã¯ã³ã³ã·ã¥ãŒããããéãåäœã§ããããšãããããŸãã ãµãã©ã€ã€ã®äœæ¥ãé ããªããšããã¹ãŠã¯åé¡ãããŸããããæ¶è²»è ããµãã©ã€ã€ãšæ©èª¿ãåãããªããšç¶æ³ã¯è€éã«ãªããŸãã ãã®å Žåãæ¶è²»è ã¯æ éã«åŠçããããã«ïŒã§ããéãïŒå¿ èŠãªããŒã¿ã§ããµããããšããããŸãã
éå°ãªããŒã¿ã«å¯ŸåŠããæãç°¡åãªæ¹æ³ã¯ãåŠçã§ããªããã¹ãŠã®ãã®ãååŸããŠç Žæ£ããããšã§ãã ããã¯ãããšãã°ãããã¯ãŒã¯æ©åšãæäœããå Žåãªã©ããŸãã«åœŒããè¡ãããšã§ãã ããããäœãããããããããªãå Žåã¯ã©ããªããŸããïŒ ãã®åŸãèå§ã圹ç«ã¡ãŸãã
ããã¯ãã¬ãã·ã£ã®æŠå¿µã¯ããªã¢ã¯ãã£ãã¹ããªãŒã ã®ã³ã³ããã¹ãã§ã¯éåžžã«éèŠã§ããããã€ãã©ã€ã³ã®é£æ¥ãããªã³ã¯éã§è»¢éãããããŒã¿ã®éãå¶éããããããªã³ã¯ã¯ãªãŒããŒãããŒããŸããã ãªã¢ã¯ãã£ãã¢ãããŒãã®æãéèŠãªåŽé¢ã¯ã絶察ã«å¿ èŠã§ãªãéãããããã³ã°ãé²ãããšãªã®ã§ããªã¢ã¯ãã£ãã¹ããªãŒã ã§ã®èå§ã®å®è£ ããã³ããããã³ã°ã§ãªããã°ãªããŸããã
ã©ããã£ãŠ
Reactive Streamsæšæºã§ã¯ãå€æ°ã®ã€ã³ã¿ãŒãã§ãŒã¹ãå®çŸ©ãããŠããŸããããã®ãããªå®è£ ã¯å®çŸ©ãããŠããŸããã ããã¯ãorg.reactivestreamsïŒReactive-streamsã«äŸåé¢ä¿ãè¿œå ããã ãã§ããã®å Žã§èžã¿ã€ããã ãã§ãç¹å®ã®å®è£ ãå¿ èŠã§ããããšãæå³ããŸãã Reactive Streamsã«ã¯å€ãã®å®è£ ããããŸããããã®èšäºã§ã¯Akka Streamsãšå¯Ÿå¿ããJavaããŒã¹ã®DSLã䜿çšããŸãã ä»ã®å®è£ ã«ã¯ã RxJava 2.xãŸãã¯Reactorãªã©ãå«ãŸããŸãã
䜿çšäŸ
æ°ããCSVãã¡ã€ã«ã远跡ããåãã¡ã€ã«ãã¹ããªãŒãã³ã°ããŒã¹ã§åŠçãããã®å Žã§éèšãå®è¡ããåéããçµæãWebãœã±ããã«ïŒãªã¢ã«ã¿ã€ã ã§ïŒéä¿¡ãããã£ã¬ã¯ããªããããšããŸãã ããã«ãå°éãããšé»åã¡ãŒã«éç¥ãããªã¬ãŒãããéçŽããŒã¿ã®èç©ã«ç¹å®ã®ãããå€ãèšå®ããŸãã
ãã®äŸã§ã¯ãCSVè¡ã«ã¯ãã¢ïŒ
id
ã
value
ïŒãå«ãŸãã
id
ã¯2è¡ããšã«å€æŽãããŸãã次ã«äŸã瀺ããŸãã
370582,0.17870700247256666
370582,0.5262255382633264
441876,0.30998025265909457
441876,0.3141591265785087
722246,0.7334219632071504
722246,0.5310146239777006
å ±éã®IDãæã€2è¡ã®å¹³åå€ãèšç®ããããã0.9ãè¶ ããå Žåã«ã®ã¿Webãœã±ããã«éä¿¡ããŸãã ããã«ã5çªç®ã®å€ããšã«Webãœã±ããã«å°çããåŸã«é»åã¡ãŒã«éç¥ãéä¿¡ããããšæããŸãã æåŸã«ãWebãœã±ããããåä¿¡ããããŒã¿ãèªã¿åãã衚瀺ããŸããããã¯ãJavaScriptã§äœæãããç°¡åãªããã³ããšã³ããä»ããŠè¡ãããŸãã
建ç¯
Akkaãšã³ã·ã¹ãã ã®å€æ°ã®ããŒã«ã䜿çšããŸãïŒå³1ãåç §ïŒã åœç¶ãAkka Streamsã¯ã·ã¹ãã å šäœã®äžå¿ã«äœçœ®ããã¹ããªãŒãã³ã°ããŒã¹ã§ãªã¢ã«ã¿ã€ã ã«ããŒã¿ãåŠçã§ããŸãã Alpakkaã䜿çšããŠCSVãã¡ã€ã«ãèªã¿åããŸããããã¯ãAkka StreamsãããŸããŸãªãã¯ãããžãŒããããã³ã«ããŸãã¯ã©ã€ãã©ãªãšçµ±åããããã®ã³ãã¯ã¿ã®ã»ããã§ãã Akka Streamsã¯ãªã¢ã¯ãã£ããããŒã§ãããããAlpakkaãšã³ã·ã¹ãã å šäœãä»ã®RSå®è£ ã«ãå©çšã§ããããšã¯èå³æ·±ãããšã§ããçžäºéçšæ§ãå®çŸããããã«èšèšãããŠããã®ã¯ãã®ãããªRSã€ã³ã¿ãŒãã§ã€ã¹ã§ãã æåŸã«ãAkka HTTPã䜿çšããŠWebãœã±ããã®ãšã³ããã€ã³ããæäŸããŸãã ãã®å Žåã®æè¯ã®éšåã¯ãAkka HTTPãAkka Streamsãšã·ãŒã ã¬ã¹ã«çµ±åãããããšã§ãïŒå®éããè£åŽãã䜿çšããŸãïŒããããã£ãŠãWebãœã±ãããšããŠã¹ããªãŒã ãæäŸããããšã¯é£ãããããŸããã
å³ 1.ã¢ãŒããã¯ãã£ã®æŠèŠ
ãã®ã¹ããŒã ãåŸæ¥ã®Java EEã¢ãŒããã¯ãã£ãšæ¯èŒãããšãããã§ã¯ãã¹ãŠãã¯ããã«åçŽã§ããããšã«æ°ä»ãã§ãããã ã³ã³ãããšBeanã¯ãããŸããããåçŽãªã¹ã¿ã³ãã¢ãã³ã¢ããªã±ãŒã·ã§ã³ã§ãã ããã«ãJava EEã¹ã¿ãã¯ã¯ã¹ããªãŒãã³ã°ã¢ãããŒãããŸã£ãããµããŒãããŠããŸããã
Akka Streamsã®åºæ¬
Akka Streamsã§ã¯ãåŠçãã€ãã©ã€ã³ïŒã°ã©ãïŒã¯ã
Source
ïŒãœãŒã¹ïŒã
Sink
ïŒãã©ããïŒã
Flow
ïŒåŠçã¹ãããïŒã®3ã€ã®èŠçŽ ã§æ§æãããŠããŸãã
ãããã®ã³ã³ããŒãã³ãã«åºã¥ããŠãæ¬è³ªçã«ããŒã¿åŠçã®åãªãã¬ã·ãã§ããã°ã©ããå®çŸ©ããŸãã èšç®ã¯è¡ãããŸããã ãã€ãã©ã€ã³ãæ©èœããããã«ã¯ãã°ã©ããå ·äœåãããã€ãŸãå®è¡å¯èœãªåœ¢åŒã«ããå¿ èŠããããŸãã ãããè¡ãã«ã¯ãã°ã©ãã®å®çŸ©ãæé©åããæçµçã«ãããå®è¡ããããããããããªã¢ã©ã€ã¶ãŒãå¿ èŠã«ãªããŸãã ãã ããçµã¿èŸŒã¿ã®ActorMaterializerã¯å®è³ªçã«ç«¶åããŠããªããããä»ã®å®è£ ã䜿çšããããšã¯ã»ãšãã©ãããŸããã
ã³ã³ããŒãã³ãã®ã¿ã€ãã®ãã©ã¡ãŒã¿ãŒãããèŠããšãåã³ã³ããŒãã³ãïŒå¯Ÿå¿ããã¿ã€ãã®å ¥å/åºåãé€ãïŒã«äžæè°ãªã¿ã€ãã®ããããããããšãããããŸãã ããã¯ããããããå®äœåãããå€ããæããŸã-ããã¯ãã°ã©ãã®å€éšããã¢ã¯ã»ã¹å¯èœãªå€ã§ãïŒã°ã©ãã®ã¹ãããéã®å éšéä¿¡ã«ã®ã¿äœ¿çšå¯èœãªå ¥å/åºåã®ã¿ã€ããšã¯å¯Ÿç §çã«ãå³2ãåç §ïŒã å®äœåãããå€ãç¡èŠãããå ŽåïŒãããŠãããã¯ã°ã©ãã®ã¹ãããéã§ããŒã¿ã転éããããšã ãã«é¢å¿ãããå Žåã«ããèµ·ãããŸãïŒããã®ãªãã·ã§ã³ã瀺ã
NotUsed
ã¿ã€ãã®ç¹å¥ãªãã©ã¡ãŒã¿ãŒããããŸãã Javaã®
Void
ãšæ¯èŒã§ããŸãããæå³çã«ã¯å°ãããŒããããŸããããã®å€ã䜿çšããªãããšããæå³ã§ã¯ã
Void
ãããæ å ±éãå€ããªããŸãã ãŸããäžéšã®APIã¯åæ§ã®ã¿ã€ãã®å®äºã䜿çšããç¹å®ã®ã¿ã¹ã¯ãå®äºããããšãéç¥ããŸãã ããããããããã®äž¡æ¹ã®å Žåã®ä»ã®Javaã©ã€ãã©ãªã¯
Void
ã䜿çšããŸãããAkka Streamsã§ã¯ããã¹ãŠã®ã¿ã€ããæçšãªã»ãã³ãã£ã¯ã¹ã§æ倧ãåããããšããŸãã
å³ 2.ã¿ã€ãFlowã®ãã©ã¡ãŒã¿ãŒã®èª¬æ
ã¢ããª
次ã«ãCSVãã³ãã©ãŒã®ç¹å®ã®å®è£ ã«ç§»ããŸãããã ãŸããAkka Streamsã°ã©ããå®çŸ©ãã次ã«Akka HTTPãããã³ã«ã䜿çšããŠãã¹ããªãŒã ãWebãœã±ããã«æ¥ç¶ããŸãã
ã¹ããªãŒã ã³ã³ãã€ãŒã®ã³ã³ããŒãã³ã
ã¹ããªãŒãã³ã°ãã€ãã©ã€ã³ã®å ¥åãã€ã³ãã§ãç®çã®ãã£ã¬ã¯ããªã«æ°ããCSVãã¡ã€ã«ãåºçŸãããã©ããã远跡ããŸãã ããã«ã¯
java.nio.file.WatchService
ã䜿çšããŸãããã¹ããªãŒãã³ã°ã¢ããªã±ãŒã·ã§ã³ããããããã€ãã³ãã®ãœãŒã¹ïŒ
Source
ïŒãååŸããŠæäœããå¿ èŠããããã³ãŒã«ããã¯ãéããŠãã¹ãŠãæŽçããå¿ èŠã¯ãããŸããã 幞ããªããšã«ããã®ãããªSourceã¯ã
DirectoryChangesSource
ã³ãã¯ã¿ã®1ã€ã®åœ¢åŒã§Alpakkaã§æ¢ã«å©çšå¯èœã§ã
alpakka-file
äžéšã§ããã
WatchService
ã
WatchService
ãã§äœ¿çšãã
alpakka-file
ã
private final Source<Pair<Path, DirectoryChange>, NotUsed> newFiles = DirectoryChangesSource.create(DATA_DIR, DATA_DIR_POLL_INTERVAL, 128);
ãããã£ãŠãã¿ã€ã
Pair<Path, DirectoryChange>
èŠçŽ ãæäŸãããœãŒã¹ãååŸããŸãã æ°ããCSVãã¡ã€ã«ã®ã¿ãéžæãããããŠã³ãããŠè»¢éããããã«ããããããã£ã«ã¿ãªã³ã°ããŸãã ãã®ããŒã¿å€æãšåŸç¶ã®ãã¹ãŠã®ããŒã¿å€æã§ã¯ãFlowãšåŒã°ããå°ããªèŠçŽ ã䜿çšããŠãæ¬æ ŒçãªåŠçãã€ãã©ã€ã³ã圢æããŸãã
private final Flow<Pair<Path, DirectoryChange>, Path, NotUsed> csvPaths = Flow.<Pair<Path, DirectoryChange>>create() .filter(this::isCsvFileCreationEvent) .map(Pair::first); private boolean isCsvFileCreationEvent(Pair<Path, DirectoryChange> p) { return p.first().toString().endsWith(".csv") && p.second().equals(DirectoryChange.Creation); }
ããšãã°ãæ±çšã®
create()
ã¡ãœããã䜿çšããŠ
Flow
ãäœæã§ããŸããããã¯ãå ¥åã¿ã€ãèªäœãæ±çšã®å Žåã«åœ¹ç«ã¡ãŸãã ããã§ãçµæã®ã¹ããªãŒã ã¯ã
DATA_DIR
çŸãããã¹ãŠã®æ°ããCSVãã¡ã€ã«ãïŒ
Path
ã®åœ¢åŒã§ïŒçæããŸãã
次ã«ãåãã¡ã€ã«ã®ã¹ããªãŒã ã«ãã£ãŠéžæããããã¹ãæååã«å€æããŸãã ãœãŒã¹ãå¥ã®ãœãŒã¹ã«å€æããã«ã¯ã
flatMap*
ã¡ãœããã®ããããã䜿çšã§ããŸãã ã©ã¡ãã®å Žåããåå ¥åèŠçŽ ãã
Source
ãäœæããçµæã®ãœãŒã¹ã®ããã€ããäœããã®åœ¢ã§çµã¿åãããŠãæ°ããåäžã®ãªã³ã¯ãœãŒã¹ãœãŒã¹ãçµåããŸãã ãã®å Žåãåã
id
æã€è¡ãé£å士ã«æ®ãããã«è¡ã®é åºãä¿æããããã
flatMapConcat
ã«çŠç¹ãåœãŠãŸãã
Path
ããã€ãã¹ããªãŒã ã«å€æããã«ã¯ãçµã¿èŸŒã¿ã®
FileIO
ãŠãŒãã£ãªãã£ã䜿çšããŸãã
private final Flow<Path, ByteString, NotUsed> fileBytes = Flow.of(Path.class).flatMapConcat(FileIO::fromPath);
ä»åã¯
of()
ã¡ãœããã䜿çšããŠæ°ããã¹ããªãŒã ãäœæããŸã-å ¥åã¿ã€ããäžè¬åãããŠããªãå Žåã«äŸ¿å©ã§ãã
äžèšã®
ByteString
ã¯ãAkka Streamsã§æ¡çšãããŠãããã€ãã·ãŒã±ã³ã¹è¡šçŸã§ãã ãã®å Žåããã€ãã¹ããªãŒã ãCSVãã¡ã€ã«ãšããŠè§£æããŸãããã®ããã«ãAlpakkaã¢ãžã¥ãŒã«ã®1ã€ãä»åã¯
alpakka-csv
ãåã³äœ¿çšããŸãã
private final Flow<ByteString, Collection<ByteString>, NotUsed> csvFields = Flow.of(ByteString.class).via(CsvParsing.lineScanner());
ããã§äœ¿çšããã
via
ã³ã³ãããŒã¿ã«æ³šæããŠãã ãããããã«ãããã°ã©ãã®å¥ã®ã¹ãããïŒ
Source
ãŸãã¯å¥ã®
Flow
ïŒã§åä¿¡ããåºåã«ä»»æã®
Flow
ãæ·»ä»ã§ããŸãã çµæã¯ãCSVãã¡ã€ã«ã®1è¡ã®ãã£ãŒã«ãã«ãããã察å¿ããèŠçŽ ã®ã¹ããªãŒã ã§ãã 次ã«ããããã察象é åã®ã¢ãã«ã«å€æã§ããŸãã
class Reading { private final int id; private final double value; private Reading(int id, double value) { this.id = id; this.value = value; } double getValue() { return value; } @Override public String toString() { return String.format("Reading(%d, %f)", id, value); } static Reading create(Collection<ByteString> fields) { List<String> fieldList = fields.stream().map(ByteString::utf8String).collect(toList()); int id = Integer.parseInt(fieldList.get(0)); double value = Double.parseDouble(fieldList.get(1)); return new Reading(id, value); } }
ãã®ããã«å€æããã«ã¯ã
map
ã¡ãœããã䜿çšããŠã
Reading.create
ã¡ãœãããžã®ãªã³ã¯ã
Reading.create
ãŸãã
private final Flow<Collection<ByteString>, Reading, NotUsed> readings = Flow.<Collection<ByteString>>create().map(Reading::create);
次ã®æ®µéã§ã¯ãèªã¿åãå€ããã¢ã§è¿œå ããåã°ã«ãŒãã®å¹³åå€ãèšç®ããç¹å®ã®ãããå€ã«éããå Žåã«ã®ã¿ããã«æ å ±ãéä¿¡ããå¿ èŠããããŸãã å¹³åãéåæçã«èšç®ããå¿ èŠãããããã
mapAsyncUnordered
ã¡ãœããã䜿çšããŸãã
mapAsyncUnordered
ã¡ãœããã¯ãæå®ãããã¬ãã«ã®äžŠåæ§ã§éåææäœãå®è¡ããŸãã
private final Flow<Reading, Double, NotUsed> averageReadings = Flow.of(Reading.class) .grouped(2) .mapAsyncUnordered(10, readings -> CompletableFuture.supplyAsync(() -> readings.stream() .map(Reading::getValue) .collect(averagingDouble(v -> v))) ) .filter(v -> v > AVERAGE_THRESHOLD);
äžèšã®ã³ã³ããŒãã³ããå®çŸ©ããããçµ±åãããã³ã³ãã€ãŒããããããè¿œå ããæºåãã§ããŸããïŒã³ã³ãããŒã¿ãŒ
via
ãŠäœ¿ãæ £ãããã®
via
䜿çšïŒã ããã¯ãŸã£ããè€éã§ã¯ãããŸããïŒ
private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings);
ã泚æ
äžèšã®ããã«ã³ã³ããŒãã³ããçµã¿åãããå Žåãã³ã³ãã€ã©ã¯ãäºææ§ã®ãªãããŒã¿åãå«ã2ã€ã®ãããã¯ã誀ã£ãŠæ¥ç¶ããªãããã«ããŠãç§ãã¡ãä¿è·ããŸãã
Webãœã±ãããšããŠã®ã¹ããªãŒã
次ã«ãAkka HTTPã䜿çšããŠããã®ãããªåœ¹å²ãæããåçŽãªWebãµãŒããŒãäœæããŸãã
- èªã¿åããœãŒã¹ãWebãœã±ãããšããŠæäŸãã
- Webãœã±ããã«æ¥ç¶ããåä¿¡ããããŒã¿ã衚瀺ããç°¡åãªWebããŒãžãçºè¡ããŸãã
Akka HTTPã䜿çšããŠWebãµãŒããŒãäœæããè²»çšã¯ããããŸãã
HttpApp
ãç¶æ¿ããDSLã«ãŒãã§å¿ èŠãªãããã³ã°ãæäŸããã ãã§ãã
class Server extends HttpApp { private final Source<Double, NotUsed> readings; Server(Source<Double, NotUsed> readings) { this.readings = readings; } @Override protected Route routes() { return route( path("data", () -> { Source<Message, NotUsed> messages = readings.map(String::valueOf).map(TextMessage::create); return handleWebSocketMessages(Flow.fromSinkAndSourceCoupled(Sink.ignore(), messages)); } ), get(() -> pathSingleSlash(() -> getFromResource("index.html") ) ) ); } }
ããã§2ã€ã®ã«ãŒããå®çŸ©ãããŠããŸãïŒ
/data
ãã€ãŸãWebãœã±ããã®ãšã³ããã€ã³ããããã³
/
ããã«æ²¿ã£ãŠç°¡åãªããã³ããšã³ããçºè¡ãããŸãã Akka Streamsã®
Source
ãWebãœã±ããã®ãšã³ããã€ã³ããšããŠæäŸããã®ãã©ãã»ã©ç°¡åãã¯ãã§ã«æããã§ã
handleWebSocketMessages
ã
handleWebSocketMessages
ããã®ã¿ã¹ã¯ã¯Webãœã±ãããžã®æ¥ç¶ãžã®HTTPæ¥ç¶ãæ¹åããããã«çä¿¡ããã³çºä¿¡ããŒã¿ãåŠçãããã¹ããªãŒã ãç·šæããããšã§ã
WebSocket
ã¹ããªãŒã ãšããŠã¢ãã«åãããŠããŸããã€ãŸããçºä¿¡ã¡ãã»ãŒãžãšçä¿¡ã¡ãã»ãŒãžãã¯ã©ã€ã¢ã³ãã«éä¿¡ãããŸãã ãã®å Žåãçä¿¡ããŒã¿ãç¡èŠããŠããçä¿¡ãåŽã
Sink.ignore()
ããŠããã¹ããªãŒã ãäœæããŸãã Webãœã±ãããã³ãã©ãŒã¹ããªãŒã ã®ã¢ããã¹ããªãŒã åŽã¯ãå¹³åå€ã®ååŸå ã§ãããœãŒã¹ã«åçŽã«æ¥ç¶ãããŠããŸãã å¹³åå€ãè¡šããã圢åŒã®
double
æ°å€ã§è¡ãå¿ èŠãããã®ã¯ãããããã
TextMessage
ã«å€æããããšã§ããããã¯ãAkka HTTPã§Webãœã±ããããŒã¿ã«äœ¿çšãããã©ãããŒã§ãã ãã¹ãŠã¯ããã§ã«ããªãã¿ã®
map
ã¡ãœããã䜿çšããŠè¡ãããŸãã
ãµãŒããŒãèµ·åããã«ã¯ããã¹ãåãšããŒããæå®ããŠ
startServer
ã¡ãœãããèµ·åããã ãã§ãã
Server server = new Server(csvProcessor.liveReadings); server.startServer(config.getString("server.host"), config.getInt("server.port"));
ããã³ããšã³ã
Webãœã±ããããããŒã¿ãåä¿¡ããŠââ衚瀺ããã«ã¯ãåä¿¡ããå€ãtextareaã«æ·»ä»ããã ãã®å®å šã«ã·ã³ãã«ãªJavaScriptã³ãŒãã䜿çšããŸãã ãã®ã³ãŒãã¯ES6æ§æã䜿çšããŠãããææ°ã®ãã©ãŠã¶ãŒã§æ£åžžã«æ©èœããã¯ãã§ãã
let ws = new WebSocket("ws://localhost:8080/data"); ws.onopen = () => log("WS connection opened"); ws.onclose = event => log("WS connection closed with code: " + event.code); ws.onmessage = event => log("WS received: " + event.data);
log
ã¡ãœããã¯ã¡ãã»ãŒãžãtextareaã«æ·»ä»ããã¿ã€ã ã¹ã¿ã³ããä»ããŸãã
æã¡äžã
ã¢ããªã±ãŒã·ã§ã³ãå®è¡ããŠãã¹ãããã«ã¯ã次ã®ãã®ãå¿ èŠã§ãã
- ãµãŒããŒãèµ·åããŸãïŒ
sbt run
ïŒã - ãã©ãŠã¶ã§localhost ïŒ8080ã«ç§»åããŸãïŒããã©ã«ããå€æŽããå Žåã¯ãéžæãããã¹ã/ããŒãã«ç§»åããŸãïŒã
-
src/main/resources/sample-data
ãããããžã§ã¯ãã«ãŒãã®data
ãã£ã¬ã¯ããªã«1ã€ãŸãã¯è€æ°ã®ãã¡ã€ã«ãã³ããŒããŸãïŒæ§æã§csv-processor.data-dir
å€æŽããªãã£ãå ŽåïŒã - ãµãŒããŒãã°ããã³ãã©ãŠã¶ã§ã®ããŒã¿ã®è¡šç€ºæ¹æ³ã確èªããŠãã ããã
ã¡ãŒã«ããªã¬ãŒãè¿œå ãã
ã¢ããªã±ãŒã·ã§ã³ã®æåŸã®ä»äžãã¯ã5çªç®ã®èŠçŽ ããã¹ãŠWebãœã±ããã«å°çããåŸã«éä¿¡ãããé»åã¡ãŒã«éç¥ãã·ãã¥ã¬ãŒããããµã€ããã£ãã«ã§ãã åºæ¬çãªèŠçŽ ã®äŒéã劚ããªãããã«ãã暪åããã«æ©èœããå¿ èŠããããŸãã
ãã®åäœãå®è£ ããã«ã¯ãAkka Streamsã®ããé«åºŠãªæ©èœã§ããGraph DSLèšèªã䜿çšããŸãããã®æ©èœã§ã¯ãç¬èªã®ã°ã©ãã¹ããããèšè¿°ãããããŒã2ã€ã®éšåã«åå²ããŸãã æåã¯åçŽã«å€ãWebãœã±ããã«éä¿¡ãã2çªç®ã¯æ¬¡ã®5ç§ã®æå¹æéãå¶åŸ¡ããé»åã¡ãŒã«ã§éç¥ãéä¿¡ããŸããå³ãåç §ããŠãã ããã 3ã
å³ 3.ã¡ãŒã«ãéä¿¡ããç¬èªââã®ã°ã©ãã¹ããã
Broadcast
çµã¿èŸŒã¿ã¹ãããã䜿çšããŸãããã®ã¹ãããã§ã¯ãçºè¡šãããäžé£ã®çµè«ã«å ¥åãéä¿¡ãããŸãã ç¬èªã®ãã©ãã
Mailer
ãäœæããŸãã
private final Graph<FlowShape<Double, Double>, NotUsed> notifier = GraphDSL.create(builder -> { Sink<Double, NotUsed> mailerSink = Flow.of(Double.class) .grouped(EMAIL_THRESHOLD) .to(Sink.foreach(ds -> logger.info("Sending e-mail") )); UniformFanOutShape<Double, Double> broadcast = builder.add(Broadcast.create(2)); SinkShape<Double> mailer = builder.add(mailerSink); builder.from(broadcast.out(1)).toInlet(mailer.in()); return FlowShape.of(broadcast.in(), broadcast.out(0)); });
GraphDSL.create()
ã¡ãœããã䜿çšããŠç¬èªã®ã°ã©ãã¹ããããäœæããŸãããã®ã¡ãœããã§ã¯ãã°ã©ã
Builder
ã€ã³ã¹ã¿ã³ã¹ã§ãã
Builder
ãæäŸãããŸããããã¯ãã°ã©ãæ§é ã®æäœã«äœ¿çšãããŸãã
次ã«ãç¬èªã®ãã©ãããå®çŸ©ããŸãã
grouped
ã䜿çšããŠãçä¿¡èŠçŽ ãä»»æã®ãµã€ãºïŒããã©ã«ãã§ã¯5ïŒã®ã°ã«ãŒãã«çµåãããã®åŸãããã®ã°ã«ãŒããéä¿¡ããŸãã ãã®ãããªåã°ã«ãŒãã«ã€ããŠãå¯äœçšãã·ãã¥ã¬ãŒãããŸãïŒé»åã¡ãŒã«éç¥ã
ç¬èªã®ãã©ãããå®çŸ©ãããã
builder
ã€ã³ã¹ã¿ã³ã¹ã䜿çšããŠãããã°ã©ãã«è¿œå ã§ããŸãã 2ã€ã®åºåãæã€
Broadcast
ã¹ããããè¿œå ããŸãã
次ã«ãã°ã©ãèŠçŽ éã®æ¥ç¶ãæå®ããå¿ èŠããããŸãã
Broadcast
ã¹ãããã®åºåã®1ã€ãé»åã¡ãŒã«ãã©ããã«æ¥ç¶ããã°ã©ãã¹ãããã®åºåãäœæããŸãã äœæããã¹ãããã®å ¥åã¯ã
Broadcast
ã¹ãããã®åºåã«çŽæ¥æ¥ç¶ãããŸãã
泚1
ã³ã³ãã€ã©ã¯ãã°ã©ãã®ãã¹ãŠã®éšåãæ£ããæ¥ç¶ãããŠãããã©ãããå€æã§ããŸããã ãã ãããã®ãã€ã³ãã¯å®è¡æã«ãããªã¢ã©ã€ã¶ãŒã«ãã£ãŠãã§ãã¯ããããããå ¥åãŸãã¯åºåã«ãã³ã°ããèŠçŽ ã¯ãããŸããã
泚2
ãã®å Žåãèšè¿°ãããã¹ãŠã®ã¹ãããã®åœ¢åŒãGraph <SãM>ã§ããããšãããããŸããããã§ãSã¯å ¥åãšåºåã®æ°ãšã¿ã€ãã決å®ãããã©ãŒã ã§ãããMã¯å ·äœåãããå€ïŒååšããå ŽåïŒã§ãã ããã§ã¯ãFlowãã©ãŒã ãæ±ã£ãŠããŸããã€ãŸãã1ã€ã®å ¥åãš1ã€ã®åºåããããŸãã
æåŸã®æ®µéã§ã¯ã
liveReadings
ãã€ãã©ã€ã³ã®è¿œå ã¹ããããšããŠnotifierãæ¥ç¶ããŸãããã®ãã€ãã©ã€ã³ã¯æ¬¡ã®åœ¢åŒãåããŸãã
private final Source<Double, NotUsed> liveReadings = newFiles .via(csvPaths) .via(fileBytes) .via(csvFields) .via(readings) .via(averageReadings) .via(notifier);
æŽæ°ãããã³ãŒããå®è¡ãããšãé»åã¡ãŒã«éç¥ã«é¢ããã¡ãã»ãŒãžããã°ã«ã©ã®ããã«è¡šç€ºãããããããããŸãã å¥ã®5ã€ã®å€ãWebãœã±ãããééãããã³ã«éç¥ãéä¿¡ãããŸãã
ãŸãšã
ãã®èšäºã§ã¯ãã¹ããªãŒãã³ã°ããŒã¿åŠçã®äžè¬çãªæŠå¿µãæ€èšããAkka Streamsã䜿çšããŠè»œéã®ããŒã¿åŠçãã€ãã©ã€ã³ãæ§ç¯ããæ¹æ³ãåŠã³ãŸããã ããã¯ãJava EEã§äœ¿çšãããåŸæ¥ã®ã¢ãããŒãã®ä»£æ¿æ段ã§ãã
Akka Streamsã«çµã¿èŸŒãŸããããã€ãã®åŠçã¹ãããã®äœ¿çšæ¹æ³ãGraph DSLã§ç¬èªã®ã¹ããããèšè¿°ããæ¹æ³ãæ€èšããŸããã ãŸããAlpakkaã䜿çšããŠãã¡ã€ã«ã·ã¹ãã ãšAkka HTTPãããã³ã«ããããŒã¿ãã¹ããªãŒãã³ã°ããæ¹æ³ã瀺ããŸãããããã«ããããšã³ããã€ã³ãã«Webãœã±ãããåããAkka Streamsãšã·ãŒã ã¬ã¹ã«çµ±åãããã·ã³ãã«ãªWebãµãŒããŒãäœæã§ããŸãã
ãã®èšäºã®ã³ãŒãã®å®å šãªå®äŸã¯GitHubã«ãããŸã ã ç°ãªããã€ã³ãã«ããã€ãã®è¿œå ã®
log
ã¹ãããããããŸãã ã³ã³ãã¢å ã§äœãèµ·ãã£ãŠããããããæ£ç¢ºã«æ³åããã®ã«åœ¹ç«ã¡ãŸãã ãã®èšäºã§ã¯ãçãããããã«æå³çã«ããããçç¥ããŸããã