Skip to content
Sahil Kharb edited this page Jan 1, 2016 · 6 revisions

Ruby-Spark

More informations

Installation

Add this line to your application's Gemfile:

gem 'ruby-spark'

And then execute:

$ bundle

Or install it yourself as:

$ gem install ruby-spark

Install Apache Spark

To install latest supported Spark. Project is build by SBT.

$ ruby-spark build

Usage

You can use Ruby Spark via interactive shell

$ ruby-spark shell

Or on existing project

require 'ruby-spark'
Spark.start

Spark.sc # => context

If you want configure Spark first. See configurations for more details.

require 'ruby-spark'

Spark.load_lib(spark_home)
Spark.config do
   set_app_name "RubySpark"
   set 'spark.ruby.batch_size', 100
   set 'spark.ruby.serializer', 'oj'
end
Spark.start

Spark.sc # => context

Uploading a data

Single file

$sc.text_file(FILE, workers_num, custom_options)

All files on directory

$sc.whole_text_files(DIRECTORY, workers_num, custom_options)

Direct

$sc.parallelize([1,2,3,4,5], workers_num, custom_options)
$sc.parallelize(1..5, workers_num, custom_options)

Options

workers_num
Min count of works computing this task.
(This value can be overwriten by spark)
custom_options
serializer: name of serializator used for this RDD
batch_size: see configuration

(Available only for parallelize)
use: direct (upload direct to java), file (upload throught a file)

Examples

Sum of numbers

$sc.parallelize(0..10).sum
# => 55

Words count using methods

rdd = $sc.text_file(PATH)

rdd = rdd.flat_map(lambda{|line| line.split})
         .map(lambda{|word| [word, 1]})
         .reduce_by_key(lambda{|a, b| a+b})

rdd.collect_as_hash

Estimating pi with a custom serializer

slices = 3
n = 100000 * slices

def map(_)
  x = rand * 2 - 1
  y = rand * 2 - 1

  if x**2 + y**2 < 1
    return 1
  else
    return 0
  end
end

rdd = Spark.context.parallelize(1..n, slices, serializer: 'oj')
rdd = rdd.map(method(:map))

puts 'Pi is roughly %f' % (4.0 * rdd.sum / n)

Linear regression

Spark::Mllib.import

data = [
  LabeledPoint.new(0.0, [0.0]),
  LabeledPoint.new(1.0, [1.0]),
  LabeledPoint.new(3.0, [2.0]),
  LabeledPoint.new(2.0, [3.0])
]
lrm = LinearRegressionWithSGD.train($sc.parallelize(data), initial_weights: [1.0])

lrm.predict([0.0])
Clone this wiki locally