Running Spark: Difference between revisions

From info319
No edit summary
No edit summary
Line 123: Line 123:


==Solutions to the Tasks==
==Solutions to the Tasks==
===Array of Distinct Resources===
''Task:'' Use Spark's ''flatMap'' transformation to collect an array of distinct resources from the triples (i.e., those strings starting with a capital letter).
''Solution:''
<nowiki>
val triples_file = sc.textFile("triples.txt")
triples_file.flatMap(line => line.split(" ")).filter(str => str.matches("[A-Z].*")).distinct.collect</nowiki>
More compact syntax, using ''_'' as a shorthand when there is only one argument:
<nowiki>
triples_file.flatMap(_.split(" ")).filter(_.matches("[A-Z].*")).distinct.collect</nowiki>
One resource per line:
<nowiki>
triples_file.flatMap(_.split(" ")).filter(_.matches("[A-Z].*")).distinct.foreach(res => println(res))</nowiki>
or compactly:
<nowiki>
triples_file.flatMap(_.split(" ")).filter(_.matches("[A-Z].*")).distinct.foreach(println(_))</nowiki>
===Refined Triple Sorting===
''Task:'' After reduction, the triple ''<CharleneCharade knows BetsyBertram>'' only appears for ''CharleneCharade''. We want it to appear for ''BetsyBertram'' too.
''Solution:'' Using the ''triples_file'' from the previous solution, we already have this reduction:
<nowiki>
val triples = triples_file.map(line => line.split(" "))
val triples_map = triples.map(arr => (arr(0), "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> "))
triples_map.reduceByKey(_+_).foreach(triple => println(triple))</nowiki>
Now finish the solution with these lines:
<nowiki>
val reverse_triples_map = triples.filter(arr => arr(2).matches("[A-Z].*")).map(arr => (arr(2), "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> "))
triples_map.union(reverse_triples_map).reduceByKey(_+_).foreach(triple => println(triple))</nowiki>
===Eliminate Anonymous Nodes===
''Task:'' After reduction, the triple ''<_anon001 area 56>'' appears for ''_anon001''. We want to eliminate ''_anon001'' so that it appears for ''BetsyBertram'' instead. The trick is to use Spark's ''join'' transformation in the right way.
''Solution:'' We first create two more maps:
<nowiki>
val anon_subject_map = triples.filter(arr => arr(0).matches("_.*")).map(arr => (arr(0), arr))
val anon_object_map = triples.filter(arr => arr(2).matches("_.*")).map(arr => (arr(2), arr))</nowiki>
We then join them by their key (the anonymous node):
<nowiki>
val joined_map = anon_object_map.join(anon_subject_map)</nowiki>
Inspect this map using ''joined_map.collect''. It has type ''RDD[(String, (Array[String], Array[String]))]'' with the anonymous resources (''_anon001''...) as keys, but we want the type ''RDD[(String, Array[String], Array[String])]'' with the real resources (''AbrahamAbel''...) as keys. Doing this is a bit more hairy:
<nowiki>
val deanon_triples_map = joined_map.flatMap { case (k, arrs) => List((arrs._1(0), arrs._1), (arrs._1(0), arrs._2)) }</nowiki>
There are several new things here:
* the ''{ case (k, v) => ... }'' notation is used to simulate functions that take two arguments k and v
* arrs._1 picks out the first array in the pair of two arrays coming out of the join
* arrs._1(0) picks out the subject of the triple in the first array, to be used as a key
Now we can combine the deanonymised triples with the triples that did not contain anonymous nodes (the regular expression ''"[^_].*"'' matches any string that does not start with an underscore ''_''):
<nowiki>
val noanon_triples_map = triples.filter(arr => arr(0).matches("[A-Z].*")).filter(arr => arr(2).matches("[^_].*")).map(arr => (arr(0), arr))
val noanon_reverse_triples_map = triples.filter(arr => arr(0).matches("[A-Z].*")).filter(arr => arr(2).matches("[A-Z].*")).map(arr => (arr(2), arr))
val triples_map = noanon_triples_map.union(noanon_reverse_triples_map).union(deanon_triples_map)</nowiki>
Inspect each of these RDDs, for example using ''.collect''.
You can now convert the triple arrays to strings and reduce by key as before:
<nowiki>
val collected_triples = triples_map.map { case (k, arr) => (k, "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> ") }.reduceByKey((str1, str2) => str1 + str2)
collected_triples.foreach(println(_))</nowiki>
You can even write the triples for each resource to a separate file:
<nowiki>
collected_triples.foreach { case (k,v) => java.nio.file.Files.write(java.nio.file.Paths.get(k), v.getBytes) }</nowiki>
===Include More Triples===
''Task:'' Include the triples from ''more_triples.txt'' in the map-reduce too. Note that ''_anon001'' occurs in both files, but represents a ''different'' anonymous node.
''Solution:'' The solution follows along the lines of the original file, for example:
<nowiki>
val more_triples_file = sc.textFile("more-triples.txt")
val more_triples_arr = more_triples_file.map(_.split(" "))</nowiki>
The trick is to keep the anonymous node names in this file and the other one separate:
<nowiki>
val more_triples_arr2 = more_triples_arr.map(arr => (arr(0).replaceAll("^_", "_file2_"), arr(1), arr(2).replaceAll("^_", "_file2_")))</nowiki>
''ReplaceAll'' is the standard regular expression replacement method for Java/Scala ''Strings''. ''"^_"'' matches all strings that start with an underscore ''_'', and replaces the underscore with ''_file2_''.
From here, you can do a ''union'' and continue as before.
===Eliminate Anonymous Nodes===
''Task:'' Make sure that your map-reduce job also eliminates ''nested'' anonymous nodes: ''more_triples.txt'' has two levels of anonymouse nodes, so that the triple ''<_anon002 postCode 555>'' appears for ''EnyaEntity''.
''Solution:'' This is harder, and involves repeated join operations, as shown in the one-level case above.

Revision as of 13:56, 26 September 2018

Loading a text file

Save these lines into a file called triples.txt in some folder, for example your home folder ~:

AbrahamAbel born 1992
AbrahamAbel livesIn duluth
AbrahamAbel phone 789456123
BetsyBertram born 1987
BetsyBertram livesIn berlin
BetsyBertram phone _anon001
BetsyBertram phone _anon002
_anon001 area 56
_anon001 local 9874321
_anon001 ext 123
_anon002 area 56
_anon002 local 1234789
CharleneCharade born 1963
CharleneCharade bornIn bristol
CharleneCharade address _anon003
CharleneCharade knows BetsyBertram
_anon003 street brislingtonGrove
_anon003 number 13
_anon003 postCode bs4
_anon003 postName bristol
DirkDental born 1996
DirkDental bornIn bergen
DirkDental knows BetsyBertram
DirkDental knows CharleneCharade

In a console (or command prompt, or terminal) window, start the Spark shell:

   spark-shell

From now on, we will run our commands inside the Spark shell, after the scala> prompt. Load the triples.txt file into Spark:

   val triples_str = sc.textFile("/home/sinoa/triples.txt")

(You must use a forwards-slash: / even on Windows.)

triples_str is now the name of a Resilient Distributed Dataset inside your spark-shell. You can enforce file loading and look at the resulting contents of the triples_str RDD with:

   triples_str.collect()

(In Scala, you can always drop empty parentheses: (), which we will do from now - so triples.collect also works.)

Spark transformations and actions

We are now ready to try out simple Spark transformations and actions: transformations create new RDDs when they are run, whereas actions produce side-effects or simpler variables.

This action counts the number of lines in triples.txt (or strings in triples_str):

   triples_str.count

These actions get the first and 5 first lines in triples_str:

   triples_str.first
   triples_str.take(5)

This action saves the triples into a subfolder of /home/sinoa/triples_copy:

   triples_str.saveAsTextFile("/home/sinoa/triples_copy")

This transformation creates a new RDD with a sample of non-duplicate lines from triples.txt.

   triples_str.sample(false, 0.5, scala.util.Random.nextInt).collect

This transformation is likely to introduce duplicate lines:

   triples_str.sample(true, 0.9, scala.util.Random.nextInt).collect

Save the result in a new RDD and rerun until truiples_dup contains at least one duplicate line:

   val triples_dup = triples_str.sample(true, 0.9, scala.util.Random.nextInt)
   triples_dup.collect

This transformation removes duplicates:

   triples_dup.distinct.collect

These are only a few of the simplest Spark transformations and actions. For a full list, see this tutorial page: https://www.tutorialspoint.com/apache_spark/apache_spark_core_programming.htm .

Unions and intersections

Save these lines into a file called more_triples.txt:

DirkDental born 1996
DirkDental bornIn bergen
DirkDental knows CharleneCharade
EnyaEntity born 2002
EnyaEntity address _anon001
EnyaEntity knows CharleneCharade
EnyaEntity knows DirkDental
_anon001 street emmastrasse
_anon001 number 7
_anon001 postArea _anon002
_anon002 postCode 45130
_anon002 postName Essen

These transformation produces all the lines that are in both files and all the lines that are in either file:

   val more_triples = sc.textFile("/home/sinoa/more_triples.txt")
   triples_str.union(more_triples).collect
   triples_str.intersection(more_triples).collect

Functions

One of the most important ideas in Spark, is passing anonymous functions as parameters to Spark transformations and actions. The functions have to be written using Scala syntax. We cannot go into detail about Scala's anonymous function syntax, but we can go some way using very simple functions (although eventually you will need a deeper understanding of Java's and Scala's overlapping type systems).

This action concatenates all the lines in "triples_str" into a single string:

   triples_str.reduce(_ + _)

Here, we passed the built-in function + (using the underscores _ to indicate the position of the two parameters to +).

If we want more control, we can write the action like this:

   triples_str.reduce((str1, str2) => str1 + " // " + str2)

Here, we defined our own anonymous Scala function (str1, str2) => str1 + " // " + str2, with two input parameters str1 and str2 and the concatenated string str1 + " // " + str2 as the result (Spark and Scala work together to keep track of the parameter and result types for us).

We can also use boolean anonymous functions to filter (a transformation) lines in our RDD:

   triples_str.filter(line => line.matches("AbrahamAbel.*")).collect

Here, the anonymous Scala function is line => line.matches("AbrahamAbel.*") with line as parameter and line.matches("AbrahamAbel.*") as the result. The latter expression is very similar to the one we wrote in Java for Hadoop: Scala builds on Java and on Java's String API.

This transformation splits each line (a string) in triples_str into an array of three strings:

   val triples = triples_str.map(line => line.split(" "))

This transformation creates a list of all the subjects, predicates, and objects in the triples:

   triples.flatMap(arr => Array(arr(0), arr(1), arr(2))).collect

We can now map the triples back to nicer-looking strings:

   triples.map(arr => "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> ").collect
   triples.map(arr => "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> ").reduce(_+_)

(In Scala, arr(0) is the first element in an array arr, arr(1) is the second, as so on.)

You can map the set of triples by their subject (setting arr(0) to be the key) and then reduce them by subject as follows:

   val triples_map = triples.map(arr => (arr(0), "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> "))
   triples_map.reduceByKey(_+_)

Interested in more Transformations and Actions??

See: https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds


Tasks

  1. Use Spark's flatMap transformation to collect an array of distinct resources from the triples (i.e., those strings starting with a capital letter).
  2. After reduction, the triple <CharleneCharade knows BetsyBertram> only appears for CharleneCharade. We want it to appear for BetsyBertram too.
  3. After reduction, the triple <_anon001 area 56> appears for _anon001. We want to eliminate _anon001 so that it appears for BetsyBertram instead. The trick is to use Spark's join transformation in the right way.
  4. Include the triples from more_triples.txt in the map-reduce too. Note that _anon001 occurs in both files, but represents a different anonymous node.
  5. Make sure that your map-reduce job also eliminates nested anonymouse nodes: more_triples.txt has two levels of anonymouse nodes, so that the triple <_anon002 postCode 555> appears for EnyaEntity.

Solutions to the Tasks

Array of Distinct Resources

Task: Use Spark's flatMap transformation to collect an array of distinct resources from the triples (i.e., those strings starting with a capital letter).

Solution:

val triples_file = sc.textFile("triples.txt")
triples_file.flatMap(line => line.split(" ")).filter(str => str.matches("[A-Z].*")).distinct.collect

More compact syntax, using _ as a shorthand when there is only one argument:

triples_file.flatMap(_.split(" ")).filter(_.matches("[A-Z].*")).distinct.collect

One resource per line:

triples_file.flatMap(_.split(" ")).filter(_.matches("[A-Z].*")).distinct.foreach(res => println(res))

or compactly:

triples_file.flatMap(_.split(" ")).filter(_.matches("[A-Z].*")).distinct.foreach(println(_))

Refined Triple Sorting

Task: After reduction, the triple <CharleneCharade knows BetsyBertram> only appears for CharleneCharade. We want it to appear for BetsyBertram too.

Solution: Using the triples_file from the previous solution, we already have this reduction:

val triples = triples_file.map(line => line.split(" "))
val triples_map = triples.map(arr => (arr(0), "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> "))
triples_map.reduceByKey(_+_).foreach(triple => println(triple))

Now finish the solution with these lines:

val reverse_triples_map = triples.filter(arr => arr(2).matches("[A-Z].*")).map(arr => (arr(2), "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> "))
triples_map.union(reverse_triples_map).reduceByKey(_+_).foreach(triple => println(triple))

Eliminate Anonymous Nodes

Task: After reduction, the triple <_anon001 area 56> appears for _anon001. We want to eliminate _anon001 so that it appears for BetsyBertram instead. The trick is to use Spark's join transformation in the right way.

Solution: We first create two more maps:

val anon_subject_map = triples.filter(arr => arr(0).matches("_.*")).map(arr => (arr(0), arr))
val anon_object_map = triples.filter(arr => arr(2).matches("_.*")).map(arr => (arr(2), arr))

We then join them by their key (the anonymous node):

val joined_map = anon_object_map.join(anon_subject_map)

Inspect this map using joined_map.collect. It has type RDD[(String, (Array[String], Array[String]))] with the anonymous resources (_anon001...) as keys, but we want the type RDD[(String, Array[String], Array[String])] with the real resources (AbrahamAbel...) as keys. Doing this is a bit more hairy:

val deanon_triples_map = joined_map.flatMap { case (k, arrs) => List((arrs._1(0), arrs._1), (arrs._1(0), arrs._2)) }

There are several new things here:

  • the { case (k, v) => ... } notation is used to simulate functions that take two arguments k and v
  • arrs._1 picks out the first array in the pair of two arrays coming out of the join
  • arrs._1(0) picks out the subject of the triple in the first array, to be used as a key

Now we can combine the deanonymised triples with the triples that did not contain anonymous nodes (the regular expression "[^_].*" matches any string that does not start with an underscore _):

val noanon_triples_map = triples.filter(arr => arr(0).matches("[A-Z].*")).filter(arr => arr(2).matches("[^_].*")).map(arr => (arr(0), arr))
val noanon_reverse_triples_map = triples.filter(arr => arr(0).matches("[A-Z].*")).filter(arr => arr(2).matches("[A-Z].*")).map(arr => (arr(2), arr))
val triples_map = noanon_triples_map.union(noanon_reverse_triples_map).union(deanon_triples_map)

Inspect each of these RDDs, for example using .collect.

You can now convert the triple arrays to strings and reduce by key as before:

val collected_triples = triples_map.map { case (k, arr) => (k, "<" + arr(0) + " " + arr(1) + " " + arr(2) + "> ") }.reduceByKey((str1, str2) => str1 + str2)
collected_triples.foreach(println(_))

You can even write the triples for each resource to a separate file:

collected_triples.foreach { case (k,v) => java.nio.file.Files.write(java.nio.file.Paths.get(k), v.getBytes) }

Include More Triples

Task: Include the triples from more_triples.txt in the map-reduce too. Note that _anon001 occurs in both files, but represents a different anonymous node.

Solution: The solution follows along the lines of the original file, for example:

val more_triples_file = sc.textFile("more-triples.txt")
val more_triples_arr = more_triples_file.map(_.split(" "))

The trick is to keep the anonymous node names in this file and the other one separate:

val more_triples_arr2 = more_triples_arr.map(arr => (arr(0).replaceAll("^_", "_file2_"), arr(1), arr(2).replaceAll("^_", "_file2_")))

ReplaceAll is the standard regular expression replacement method for Java/Scala Strings. "^_" matches all strings that start with an underscore _, and replaces the underscore with _file2_.

From here, you can do a union and continue as before.

Eliminate Anonymous Nodes

Task: Make sure that your map-reduce job also eliminates nested anonymous nodes: more_triples.txt has two levels of anonymouse nodes, so that the triple <_anon002 postCode 555> appears for EnyaEntity.

Solution: This is harder, and involves repeated join operations, as shown in the one-level case above.