RDD, kasutades Sparki: Apache Sparki ehituskivi

See Sparki kasutav RDD-blogi annab teile üksikasjalikud ja põhjalikud teadmised RDD-st, mis on Sparki põhiline üksus ja kui kasulik see on.

, Sõna ise on piisav, et tekitada sära iga Hadoopi inseneri peas. TO n mälus töötlemise tööriist mis on klastriarvutuses välkkiire. Võrreldes MapReduce'iga muudab mälusisene andmete jagamine RDD-d 10-100x kiiremini kui võrgu ja ketta jagamine ja see kõik on võimalik RDD-de (elastsed hajutatud andmekogumid) tõttu. Võtmekohad, millele me täna selles Sparki artiklis RDD-s keskendume, on järgmised:



Kas vajate RDD-sid?

Miks vajame Sparki kasutades RDD-d?



Maailm areneb koos ja Andmeteadus aastal edasimineku tõttu . Algoritmid põhineb Taandareng , , ja mis töötab edasi Levitatakse Iteratiivne arvutus liitumine mood, mis hõlmab andmete taaskasutamist ja jagamist mitme arvutiseadme vahel.

Traditsiooniline tehnikad vajasid stabiilset vahe- ja jaotatud salvestusruumi HDFS mis sisaldab korduvaid arvutusi koos andmete replikatsioonide ja andmete järjestamisega, mis muutis protsessi palju aeglasemaks. Lahenduse leidmine polnud kunagi lihtne.



See on koht RDD-d (Elastsed hajutatud andmekogumid) jõuab üldpildini.

RDD Neid on lihtne kasutada ja nende loomine on vaevatu, kuna andmed imporditakse andmeallikatest ja kukutatakse RDD-desse. Edasi rakendatakse toiminguid nende töötlemiseks. Nad on a hajutatud mälukogumik lubadega as Loe ainult ja mis kõige tähtsam, need on Rikkekindel .



Kui mõni andmesektsioon kohta RDD on kadunud , seda saab taastada, rakendades sama muutumine operatsiooni sellel kaotatud partitsioonil sugupuu , selle asemel et kõiki andmeid nullist töödelda. Selline lähenemine reaalajas stsenaariumides võib panna imesid juhtuma andmete kadumise või süsteemi tööseisaku korral.

Mis on RDD-d?

RDD või ( Elastne hajutatud andmekogum ) on põhiline andmete struktuur Sädemes. Termin Elastne määratleb võime, mis genereerib andmed automaatselt või andmed tagasi veeremine Euroopa algne olek kui ootamatu õnnetus toimub andmete kaotamise tõenäosusega.

RDD-desse kirjutatud andmed on jaotatud ja salvestatud mitu käivitatavat sõlme . Kui täitev sõlm ebaõnnestub jooksu ajal, siis saab see kohe varukoopia järgmine käivitatav sõlm . Seetõttu peetakse RDD-sid muude traditsiooniliste andmestruktuuridega võrreldes täiustatud tüüpi andmestruktuurideks. RDD-d saavad salvestada struktureeritud, struktureerimata ja poolstruktureeritud andmeid.

Läheme edasi oma RDD-ga, kasutades Sparki ajaveebi, ja õppige tundma RDD-de unikaalseid omadusi, mis annab sellele eelise muud tüüpi andmestruktuuride ees.

RDD omadused

  • Mälusisene (RAM) Arvutused : Mälusisene arvutamise mõiste viib andmetöötluse kiiremasse ja tõhusamasse etappi, kus üldiselt jõudlus süsteemi on täiendatud.
  • L tema hinnang : Termin Laisk hindamine ütleb teisendused rakendatakse RDD andmetele, kuid väljundit ei genereerita. Selle asemel on rakendatud teisendused sisse logitud.
  • Püsivus : Sellest tulenevad RDD-d on alati korduvkasutatav.
  • Jämedateralised operatsioonid : Kasutaja saab rakendada teisendusi kõigile andmekogumite elementidele kaart, filter või grupeerida toimingud.
  • Rikketaluv : Andmete kadumise korral saab süsteem seda teha tagasi veerema selle juurde algne olek kasutades logitud teisendused .
  • Muutmatus : Määratletud, hangitud või loodud andmeid ei saa olla muutunud kui see on süsteemi sisse logitud. Kui peate olemasolevale RDD-le juurde pääsema ja seda muutma, peate looma uue RDD-d, rakendades komplekti Muutumine funktsioone praegusel või sellele eelneval RDD-l.
  • Jaotus : See on ülioluline üksus paralleelsus Sparkis RDD. Vaikimisi põhineb loodud sektsioonide arv teie andmeallikal. Võite isegi otsustada, mitu partitsiooni soovite kasutada kohandatud partitsioon funktsioone.

RDD loomine Sparki abil

RDD-sid saab luua kolmel viisil:

  1. Andmete lugemine saidilt paralleelsed kogud
val PCRDD = spark.sparkContext.parallelize (Array ('E', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ). foreach (println)
  1. Kandideerimine muutumine varasematel RDD-del
val sõnad = spark.sparkContext.parallelize (Seq ('Säde', 'is', 'a', 'väga', 'võimas', 'keel')) val wordpair = sõnad.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Andmete lugemine saidilt väline salvestusruum või failirajad nagu HDFS või HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

RDD-dega tehtavad toimingud:

RDD-dega tehakse peamiselt kahte tüüpi toiminguid, nimelt:

  • Teisendused
  • Toimingud

Teisendused : The toimingud rakendame RDD-dele filter, juurdepääs ja muutma vanema RDD andmed a genereerimiseks järjestikune RDD kutsutakse muutumine . Uus RDD tagastab kursori eelmisele RDD-le, tagades nende vahelise sõltuvuse.

Teisendused on Laisad hinnangud, Teisisõnu, RDD-s rakendatavad toimingud, mida te töötate, logitakse, kuid mitte hukati. Pärast süsteemi käivitamist viskab süsteem tulemuse või erandi Tegevus .

Me võime jagada teisendused kahte tüüpi, nagu allpool:

  • Kitsad teisendused
  • Laiad teisendused

Kitsad teisendused Rakendame a-le kitsaid teisendusi üks partitsioon vanema RDD-le uue RDD genereerimiseks, kuna RDD töötlemiseks vajalikud andmed on saadaval jaotise ühes partitsioonis ema ASD . Kitsa teisenduse näited on järgmised:

  • kaart ()
  • filter ()
  • flatMap ()
  • partitsioon ()
  • mapPartitions ()

Laiad teisendused: Rakendame laia transformatsiooni mitu sektsiooni uue RDD genereerimiseks. RDD töötlemiseks vajalikud andmed on saadaval ema ASD . Laiade teisenduste näited on järgmised:

  • vähendadaBy ()
  • liit ()

Toimingud : Toimingud juhendavad Apache Sparki rakendama arvutamine ja edastage tulemus või erand draiveri RDD-le. Vähesed tegevused hõlmavad järgmist:

  • koguma ()
  • loenda ()
  • võtma ()
  • esimene ()

Rakendame toiminguid RDD-del praktiliselt:

IPL (India kõrgliiga) on kriketiturniir, mille tipp on tipptasemel. Niisiis, lubame täna IPL-i andmekogumile kätte saada ja käivitada meie RDD Sparki abil.

  • Esiteks, laadime alla IPL-i CSV-vaste andmed. Pärast selle allalaadimist hakkab see välja nägema nagu ridade ja veergudega EXCEL-fail.

java meetodist välja murda

Järgmises etapis käivitame säde ja laadime faili match.csv selle asukohast, minu puhul minucsvfaili asukoht on „/User/edureka_566977/test/matches.csv”

Alustame nüüd Muutumine esimene osa:

  • kaart ():

Me kasutame Kaardi teisendamine konkreetse teisendustoimingu rakendamiseks RDD iga elemendi jaoks. Siin loome RDD nimega CKfile, kuhu meiecsvfaili. Loome veel ühe RDD, millele kutsutakse riike salvestage linna üksikasjad .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val olekud = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filter ():

Filtri teisendamine, nimi ise kirjeldab selle kasutamist. Me kasutame seda teisendustoimingut, et filtreerida välja valitud andmed valitud andmete kogumist. Kandideerime filtri töö siit, et saada aasta IPL-i mängude rekordid 2017 ja salvestage see faili RDD.

val fil = CKfile.filter (rida => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Rakendame FlatMap on teisendustoiming RDD kõigi elementide jaoks, et luua uus RDD. See sarnaneb kaardi teisendusega. siin me rakendameLame kaartkuni sülitada Hyderabadi linna matšid ja salvestage andmedfilRDDRDD.

val filRDD = fil.flatMap (rida => line.split ('Hyderabad')). koguma ()

  • partitsioon ():

Iga RDD-sse kirjutatav teave jaguneb teatud arvuks partitsioonideks. Kasutame seda teisendust partitsioonide arv andmed jagunevad tegelikult.

val fil = CKfile.filter (rida => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Arvestame MapPatitionsiga Mapi () jaigaühele() koos. Selle leidmiseks kasutame siin mapPartitions ridade arv meil on meie failis RDD.

val fil = CKfile.filter (rida => line.contains ('2016')) fil.mapPartitions (idx => Massiiv (idx.size) .iterator) .collect

  • vähendadaBy ():

Me kasutameReduceBy() peal Põhiväärtuste paarid . Kasutasime seda ümberkujundamistcsvfaili mängija leidmiseks matšide kõrgeim mees .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • liit ():

Nimi seletab kõik, Me kasutame ametiühingu ümberkujundamine on klubi kaks RDD koos . Siin loome kaks RDD-d, nimelt fil ja fil2. fil RDD sisaldab 2017. aasta IPL-i vastete kirjeid ja fil2 RDD sisaldab 2016. aasta IPL-vaste kirjet.

val fil = CKfile.filter (rida => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Alustame Tegevus osa, kus näitame tegelikku väljundit:

  • koguma ():

Kogumine on tegevus, mida me kasutame sisu kuvamine RDD-s.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

mis on meetodi ülekoormamine ja meetodi ülimuslikkus
  • loend ():

Krahvon toiming, mida kasutame kirjete arv olemas RDD-s.Siinkasutame seda toimingut meie match.csv-failis olevate kirjete koguarvude lugemiseks.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • võtma ():

Take on toiming, mis sarnaneb kogumisega, kuid ainus erinevus on see, et see saab printida mis tahes ridade valikuline arv vastavalt kasutaja soovile. Siin rakendame järgmise koodi printimiseks esikümnesse juhtivad aruanded.

val statecountm = Scount.reduceByKey ((x, y) => x + y). kaart (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. võta (10) .foreach (println)

  • esimene ():

Esimene () on toiming, mis on sarnane kogumise () ja võtmise ()sedakasutatakse ülemise aruande s väljundi printimiseks Siin kasutame esimese () toimingut teatud linnas mängitud mängude maksimaalne arv ja väljundiks saame Mumbai.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val olekud = CKfile.map (_. split (',') (2)) val Scount = olekud.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach [println] val statecountm = Scount.reduceByKey ((x, y) => x + y) .kaart (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Selleks, et muuta meie protsess meie Sparki abil RDD õppimiseks veelgi huvitavamaks, olen välja mõelnud huvitava kasutusjuhtumi.

RDD, kasutades Sparki: Pokemoni kasutamise juhtum

  • Esiteks, Laadime alla Pokemon.csv-faili ja laadime selle sädemekarpi nagu faili Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemonid on tegelikult saadaval mitmesugustes sortides. Leiame mõned sordid.

  • Skeemi eemaldamine failist Pokemon.csv

Me ei pruugi seda vajada Skeem failist Pokemon.csv. Seega eemaldame selle.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Nende arvu leidmine vaheseinad meie pokemon.csv on jaotatud.
println ('Partitsioonide arv =' + NoHeader.partitions.size)

  • Vesi Pokemon

Leidmine veepokemooni arv

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Tuli Pokemon

Leidmine Fire pokemoni arv

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Samuti võime tuvastada elanikkonnast erinevat tüüpi pokemone, kasutades loendusfunktsiooni
WaterRDD.count () FireRDD.count ()

  • Kuna mulle meeldib mäng kaitsestrateegia leidkem pokemon koos maksimaalne kaitse.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Kõrgeim_kaitse:' + defenceList.max ())

  • Me teame maksimaalselt kaitse tugevuse väärtus aga me ei tea, mis pokemon see on. leiame siis, mis see on pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. kaart {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Tellimine [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Nüüd lahendame pokemoni koos kõige vähem kaitse
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Vaatame nüüd Pokemonit a-ga vähem kaitsev strateegia.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonName .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Tellimine [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Nii jõuame sellega Sparki artikliga RDD-le otsa. Loodan, et panime pisut valgust teie teadmistele RDD-de, nende funktsioonide ja erinevat tüüpi toimingute kohta, mida nendega saab teha.

See artikkel põhineb on mõeldud selleks, et valmistada teid ette Cloudera Hadoopi ja Sparki arendaja sertifitseerimise eksamiks (CCA175). Saad põhjalikud teadmised Apache Sparki ja Sparki ökosüsteemi kohta, mis sisaldab Spark RDD-d, Spark SQL-i, Spark MLlibi ja Spark Streamingut. Saad põhjalikud teadmised Scala programmeerimiskeele, HDFS-i, Sqoopi, Flume'i, Spark GraphXi ja sõnumsüsteemi kohta, näiteks Kafka.