Tuesday, July 17, 2018

Starting Scala Spark - Streaming via TCP network socket in Windows

This is a simple tutorial about basic Scala Spark word count via streaming. There are so many samples for achieving this same. Then why this post?

One reason is all the posts are talking about how it works in Linux machines. Very rarely we find posts where the authors are using Windows machines. Another reason is to simplify the code to be understandable by a Scala beginner such a proper names for reduceByKey(_+_) which is very difficult to understand at first. Lets get started.

Spark

The code is pretty much straight forward as given below.

package org.apache.spark.examples.streaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

// Counts words in input stream.
object SparkStreaming_HdfsWordCount {
  def main(args: Array[String]) {
    //Initialization
    Logger.getLogger("org").setLevel(Level.ERROR) //Remove the noise which is useful sometimes
    var master = "local[2]" //Change to have immutability. Helps to run easily in local machine ans cluster mode.
    args.sliding(2, 2).toList.collect {
      case Array("-master", arg: String) => master = arg
    }
    val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster(master)
    // Setup streaming
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    val lines:ReceiverInputDStream[String] =ssc.socketTextStream("localhost",8081)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(word => {
      //println(x) //Debug purpose. Can see individual word
      (word, 1) // 1 means one instance of word.
    }).reduceByKey((x,y) => x + y) //Summing

    wordCounts.print() // Display the output

    ssc.start()
    ssc.awaitTermination()//Wait
  }
}

Some changes are done from the standard sample versions during troubleshooting. If we create a sample project in IntelliJ IDE for Spark, we can get streaming program which has file location as stream source. It might be little tedious to get the File system working in windows machines such as prefixing file:// or file:/// and the confusion over backslash or front slash etc...So better to use the network as source

If the environment is setup right, running the above program will try to connect to localhost:8081 and process the network stream.

PowerShell

The above Scala code make connection to port 8081 in localhost. But where is the server? Server means the producer of events into stream. If we look at normal posts they talk about a utility called nc which can be used to generate contents into the stream. 

But that is a Linux specific utility named netcat (nc). Though there are some equivalent available in Windows, it may be little difficult to get things working unless we download the binaries from known or unknown sites. For simplicity lets have the producer in PowerShell

$port=8081
$endpoint = new-object System.Net.IPEndPoint ([system.net.ipaddress]::any, $port) 
$listener = new-object System.Net.Sockets.TcpListener $endpoint
$listener.server.ReceiveTimeout = 5000

$listener.start() 

try {
    Write-Host "Listening on port $port, press CTRL+C to cancel"

    While ($true){

        if (!$listener.Pending())
        {
            Start-Sleep -Seconds 1;
            continue;
        }
        $client = $listener.AcceptTcpClient()
        #$client.client.RemoteEndPoint | Add-Member -NotePropertyName DateTime -NotePropertyValue (get-date) -PassThru

        $id = New-Guid
        $data = [text.Encoding]::ASCII.GetBytes("joymon $id")    $client.GetStream().Write($data,0,$data.length)
        "Sent message - $date - $(get-date)"

        $client.close()
   }
}
catch {
        Write-Error $_       
}
finally{
            $listener.stop()
            Write-host "Listener Closed Safely"
}

This is again straight forward. Inside a loop it wait for incoming connection. Once the connection is established, it will send messages suffixed with a GUID. The GUID is for making sure all the messages are reaching to the Spark streaming. The PowerShell code is mainly composed from these link1 & link2. Thanks to the authors. Just copy paste the code to PowerShell ISE and run.

The real time scenarios could be really complex than this word count. But this will give us an idea how the Spark Streaming works.

No comments: