Running Spark
Loading a text file
Save these lines into a file called triples.txt in some folder, for example your home folder ~:
AbrahamAbel born 1992 AbrahamAbel livesIn duluth AbrahamAbel phone 789456123 BetsyBertram born 1987 BetsyBertram livesIn berlin BetsyBertram phone _anon001 BetsyBertram phone _anon002 _anon001 area 56 _anon001 local 9874321 _anon001 ext 123 _anon002 area 56 _anon002 local 1234789 CharleneCharade born 1963 CharleneCharade bornIn bristol CharleneCharade address _anon003 CharleneCharade knows BetsyBertram _anon003 street brislingtonGrove _anon003 number 13 _anon003 postCode bs4 _anon003 postName bristol DirkDental born 1996 DirkDental bornIn bergen DirkDental knows BetsyBertram DirkDental knows CharleneCharade
In a console (or command prompt, or terminal) window, start the Spark shell:
spark-shell
From now on, we will run our commands inside the Spark shell, after the scala> prompt. Load the triples.txt file into Spark:
val triples_str = sc.textFile("/home/sinoa/triples.txt")
(You must use a forwards-slash: / even on Windows.)
triples_str is now the name of a Resilient Distributed Dataset inside your spark-shell. You can enforce file loading and look at the resulting contents of the triples_str RDD with:
triples_str.collect()
(In Scala, you can always drop empty parentheses: (), which we will do from now - so triples.collect also works.)
Spark transformations and actions
We are now ready to try out simple Spark transformations and actions: transformations create new RDDs when they are run, whereas actions produce side-effects or simpler variables.
This action counts the number of lines in triples.txt (or strings in triples_str):
triples_str.count
These actions get the first and 5 first lines in triples_str:
triples_str.first triples_str.take(5)
This action saves the triples into a subfolder of /home/sinoa/triples_copy:
triples_str.saveAsTextFile("/home/sinoa/triples_copy")
This transformation creates a new RDD with a sample of non-duplicate lines from triples.txt.
triples_str.sample(false, 0.5, scala.util.Random.nextInt).collect
This transformation is likely to introduce duplicate lines:
triples_str.sample(true, 0.9, scala.util.Random.nextInt).collect
Save the result in a new RDD and rerun until truiples_dup contains at least one duplicate line:
val triples_dup = triples_str.sample(true, 0.9, scala.util.Random.nextInt) triples_dup.collect
This transformation removes duplicates:
triples_dup.distinct.collect
These are only a few of the simplest Spark transformations and actions. For a full list, see this tutorial page: https://www.tutorialspoint.com/apache_spark/apache_spark_core_programming.htm .
Unions and intersections
Save these lines into a file called more_triples.txt:
DirkDental born 1996 DirkDental bornIn bergen DirkDental knows CharleneCharade EnyaEntity born 2002 EnyaEntity address _anon001 EnyaEntity knows CharleneCharade EnyaEntity knows DirkDental _anon001 street emmastrasse _anon001 number 7 _anon001 postArea _anon002 _anon002 postCode 45130 _anon002 postName Essen
These transformation produces all the lines that are in both files and all the lines that are in either file:
val more_triples = sc.textFile("/home/sinoa/more_triples.txt") triples_str.union(more_triples).collect triples_str.intersection(more_triples).collect
Functions
One of the most important ideas in Spark, is passing anonymous functions as parameters to Spark transformations and actions. The functions have to be written using Scala syntax. We cannot go into detail about Scala's anonymous function syntax, but we can go some way using very simple functions (although eventually you will need a deeper understanding of Java's and Scala's overlapping type systems).
This action concatenates all the lines in "triples_str" into a single string:
triples_str.reduce(_ + _)
Here, we passed the built-in function + (using the underscores _ to indicate the position of the two parameters to +).
If we want more control, we can write the action like this:
triples_str.reduce((str1, str2) => str1 + " // " + str2)
Here, we defined our own anonymous Scala function (str1, str2) => str1 + " // " + str2, with two input parameters str1 and str2 and the concatenated string str1 + " // " + str2 as the result (Spark and Scala work together to keep track of the parameter and result types for us).
We can also use boolean anonymous functions to filter (a transformation) lines in our RDD:
triples_str.filter(line => line.matches("AbrahamAbel.*")).collect
Here, the anonymous Scala function is line => line.matches("AbrahamAbel.*") with line as parameter and line.matches("AbrahamAbel.*") as the result. The latter expression is very similar to the one we wrote in Java for Hadoop: Scala builds on Java and on Java's String API.
This transformation splits each line (a string) in triples_str into an array of three strings:
val triples = triples_str.map(line => line.split(" "))
This transformation creates a list of all the subjects, predicates, and objects in the triples:
triples.flatMap(arr => Array(arr(0), arr(1), arr(2))).collect
We can now map the triples back to nicer-looking strings:
triples.map(arr => "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> ").collect triples.map(arr => "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> ").reduce(_+_)
(In Scala, arr(0) is the first element in an array arr, arr(1) is the second, as so on.)
You can map the set of triples by their subject (setting arr(0) to be the key) and then reduce them by subject as follows:
val triples_map = triples.map(arr => (arr(0), "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> ")) triples_map.reduceByKey(_+_)
Interested in more Transformations and Actions??
See: https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds
Tasks
- Use Spark's flatMap transformation to collect an array of distinct resources from the triples (i.e., those strings starting with a capital letter).
- After reduction, the triple <CharleneCharade knows BetsyBertram> only appears for CharleneCharade. We want it to appear for BetsyBertram too.
- After reduction, the triple <_anon001 area 56> appears for _anon001. We want to eliminate _anon001 so that it appears for BetsyBertram instead. The trick is to use Spark's join transformation in the right way.
- Include the triples from more_triples.txt in the map-reduce too. Note that _anon001 occurs in both files, but represents a different anonymous node.
- Make sure that your map-reduce job also eliminates nested anonymouse nodes: more_triples.txt has two levels of anonymouse nodes, so that the triple <_anon002 postCode 555> appears for EnyaEntity.