Sunday, 26 March 2017

Playing with java date format



scala> import java.util.Calendar
import java.util.Calendar

scala> val today = Calendar.getInstance().getTime()
today: java.util.Date = Sun Mar 26 00:58:12 PDT 2017

scala> import java.text.SimpleDateFormat
import java.text.SimpleDateFormat

scala> val hourMinuteSecs = new SimpleDateFormat("kkmmss").format(today)
hourMinuteSecs: String = 245812

scala> val hourMinuteSecs = new SimpleDateFormat("hhmmss").format(today)
hourMinuteSecs: String = 125812

scala> val hourMinuteSecs = new SimpleDateFormat("HHmmss").format(today)
hourMinuteSecs: String = 005812

scala> val hourMinuteSecs = new SimpleDateFormat("HHmmss a").format(today)
hourMinuteSecs: String = 005812 AM

Tuesday, 14 March 2017

scala value class vs ref class

Scala Value class vs Ref class

Int (including FloatDoubleChar etc) is a value class in scala, which is totally different thing than Null trait when you do val x: Int = null.
final abstract class Int() extends scala.AnyVal {}
What is a value class?
Value class is a class whose instances are not represented as
objects by the underlying host system. All value classes inherit from
class AnyVal.
You can try your creating your own value class,
scala> case class ValString(str: String) extends AnyVal
defined class ValString

scala> val string: ValString = null
<console>:13: error: type mismatch;
 found   : Null(null)
 required: ValString
       val string: ValString = null
                               ^
value class needs to have some value or none. Thats why its recommended to use Option[ValString]
On the other hand, String is AnyRef. Actually AnyRef corresponds to java.lang.Object.
example,
scala> class RefString(str: String) extends AnyRef
defined class RefString

scala> val refString : RefString = null
refString: RefString = null

Saturday, 14 January 2017

hash functions/ cryptographic hash functions


hash functions/ cryptographic hash functions

a mathematical algorithm that maps data of arbitrary size to a bit string of a fixed size (a hash function)
which is designed to also be a one-way function

secure hash(s-ha)

SHA-1 produces a 160-bit (20-byte) hash value known as a message digest.
A SHA-1 hash value is typically rendered as a hexadecimal number, 40 digits long = 40 digits/2 bytes.
$ echo "1" | shasum -a 1
e5fa 44f2 b31c 1fb5 53b6 021e 7360 d07d 5d91 ff5e  -

$ echo "11" | shasum -a 1
dd71 038f 3463 f511 ee74 03db cbc8 7195 302d 891c  -

$ printf 'dd71038f3463f511ee7403dbcbc87195302d891c' | wc -c
      40

$ shasum downloadData.log -a 1
f500ddd45af385b3bbdffdc3457701bf5b9a37a1  downloadData.log

scala> val hash = java.security.MessageDigest.getInstance("SHA-1").digest("1".getBytes())
x: Array[Byte] = Array(53, 106, 25, 43, 121, 19, -80, 76, 84, 87, 77, 24, -62, -115, 70, -26, 57, 84, 40, -85)

scala> new String(hash)
res5: String = 5j?+y?�LTWM?�F�9T(�

Merkle–Damg√•rd 5

The MD5 algorithm is a widely used hash function producing a 128-bit hash value.
Used for checksum match against a file, but is very vulnerable with collisions within seconds.

$ echo "1" | md5 
b026324c 6904b2a9 cb4b88d6 d61c81d1 ##is a [hex number, compressed to base16](http://stackoverflow.com/q/43556742/432903)

#hash bytes
printf "%s" "b026324c6904b2a9cb4b88d6d61c81d1" | wc -c
      32
# hash bits = (32/2=16bytes)*8 = 128bits

## another way
$ md5 <<<"1"
b026324c6904b2a9cb4b88d6d61c81d1

$ md5 build.sbt 
MD5 (build.sbt) = d10c 6aff 431a 61c5 b3bd 1a03 8519 900c
scala> import java.security.MessageDigest
import java.security.MessageDigest

scala> val hash = MessageDigest.getInstance("MD5").digest("1".getBytes("UTF-8"))
hash: Array[Byte] = Array(-60, -54, 66, 56, -96, -71, 35, -126, 13, -52, 80, -102, 111, 117, -124, -101)

scala> val hash = MessageDigest.getInstance("MD5").digest("1".getBytes("UTF-8")).length
hash: Int = 16

Refs



Sunday, 4 December 2016

create a OS managed service using systemd

  1. UNIX System V OS/ BSD OS init systems,
  2. upstart, 2006, an event-based replacement for the traditional init daemon upstart.
    The recent version of ubuntu uses systemd, as far as I remember previous versions of ubuntu use upstart.
  3. runit, 2004
  4. launchd in MacOS world.
PS. There's lot of hate against systemd, it being against Unix philosophy that goes as;
Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.
Here's a thread which might be useful to dig.

Fun time

I'm creating a streaming service in this example, which will simply write an event to a file continuously. And, if I restart my machine the service will start emitting events to a file right away.
Also I can start or stop service using systemctl start/stop serviceName. I used to use nohup <command> & and to kill it I had to use get the PID and then kill it. I can get rid of all that crap with systemD service.
STEP 1 - create streaming.service at /etc/systemd/system/multi-user.target.wants/streaming.service, also at /etc/systemd/system/streaming.service.
[Unit]                                                                                                                                           
Description=Streaming pipeline                                                                      

[Service] 
Type=forking                                                                         
ExecStart=/usr/local/bin/streaming.sh
TimeoutSec=infinity
Restart=always

[Install]
WantedBy=multi-user.target
STEP 2 - create a bash script at /usr/local/bin/streaming.sh and give current user permission to it. Don't forget #!/bin/bash for the bash scripts.
#!/bin/bash
while true
 do
    echo "Streaming an event at `date`" >> /var/log/streaming.log
 done
chmod 777 /usr/local/bin/streaming.sh
STEP 3 - reload systemd daemon so that it loads streaming.service and enable the service
systemctl daemon-reload
systemctl enable streaming.service
STEP 4 - see the streaming job status
systemctl status streaming.service
I can use journalctl _PID=?? to debug if the service is not working. eg.
journalctl _PID=5871
-- Logs begin at Mon 2016-11-21 16:36:41 PST, end at Tue 2016-12-06 14:34:56 PST. --
Nov 26 04:17:52 y0319t10971 sshd[5871]: Connection closed by 10.16.132.191 [preauth]
Dec 06 14:33:15 y0319t10971 systemd[5871]: Failed at step EXEC spawning /usr/local/bin/streaming.sh: Exec format error
or also can use following command,
root@y0319t10971:~# journalctl -fu streaming.service
-- Logs begin at Mon 2016-11-21 16:36:41 PST. --
Dec 06 14:33:16 y0319t10971 systemd[1]: streaming.service: main process exited, code=exited, status=203/EXEC
Dec 06 14:33:16 y0319t10971 systemd[1]: Unit streaming.service entered failed state.
Dec 06 14:33:16 y0319t10971 systemd[1]: streaming.service failed.
Dec 06 14:34:28 y0319t10971 systemd[1]: Started Streaming pipeline.
Dec 06 14:34:28 y0319t10971 systemd[1]: Starting Streaming pipeline...
Dec 06 14:34:28 y0319t10971 systemd[1]: streaming.service: main process exited, code=exited, status=203/EXEC
Dec 06 14:34:28 y0319t10971 systemd[1]: Unit streaming.service entered failed state.
Dec 06 14:34:28 y0319t10971 systemd[1]: streaming.service failed.
Dec 06 14:44:58 y0319t10971 systemd[1]: Started Streaming pipeline.
Dec 06 14:44:58 y0319t10971 systemd[1]: Starting Streaming pipeline...
STEP 5 - see the events produced by the streaming.service
tail -f /var/log/streaming.log 
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
PS. Dont forget to stop the service, otherwise your Storage will be full in few days :)
ll /var/log/streaming.log --block-size=GB
-rw-r--r-- 1 root root 3GB Dec  6 13:51 /var/log/streaming.log
Also, the service would be running right after I reboot the machine.

Sunday, 27 November 2016

Playing with reactive streams with akka

Recently I'm learning Akka streaming for fun. Streams are basically Collections of data/events with unknown ending. So stream programming is useful if I want to process huge data set which will keep producing events over time. And, whenever I see any new event on the stream I will do the further processing on that event.
For example, the streams of online orders on any retail store. Customers will keep ordering, its a stream and once the store person sees the new order he looks for the items ordered and and picks them up.
Once he gathers all the items for one order, sends to other guy who will pack them into a box and tell other guy to put label on them. Which means there is stream of(orders ready to pack) and then there's stream of(orders ready to be labelled and shipped).
Finally all the labelled orders are put on the truck. So the truck or trailer is the final Sink here.
To visualize it,
online orders -> pick items -> pack order -> put label -> trailer -> | Done
Let's achieve it using akka streaming,
First, I will define the Order states,
case class Order(val itemName: String, val quantity: Int, val at: Date)
case class Picked(val itemName: String, val quantity: Int, val at: Date)
case class Packed(val itemName: String, val quantity: Int, val at: Date)
case class Labelled(val itemName: String, val quantity: Int, val at: Date)

Stream Source

Source is the Collection of elements which it keeps publishing/emitting.
scala> import akka.stream.scaladsl.{Source, Sink}

scala> val customerOrdersStream = Source(List(Order("DN black shirt - Triangle", 1, new Date()),
     |       Order("DN white shirt - classic", 2, new Date()),
     |       Order("DN red shirt - memento mori", 3, new Date())))
customerOrdersStream: akka.stream.scaladsl.Source[Order,akka.NotUsed] =
Source(SourceShape(StatefulMapConcat.out), CompositeModule [6611a9b6]
  Name: iterableSource
  Modules:
    (singleSource) GraphStage(SingleSource(List(Order(DN black shirt - Triangle,1,Sun Nov 27 13:05:38 PST 2016), Order(DN white shirt - classic,2,Sun Nov 27 13:05:38 PST 2016), Order(DN red shirt - memento mori,3,Sun Nov 27 13:05:38 PST 2016)))) [415e4363]
    (unnamed) [2cc00ee7] copy of GraphStage(StatefulMapConcat) [3b270282]
  Downstreams:
    single.out -> StatefulMapConcat.in
  Upstreams:
    StatefulMapConcat.in -> single.out
  MatValue: Atomic(singleSource[415e4363]))
Once Source is defined, I need to define the processing steps it has to go through, which are in this example pickingpackinglabelling. And these are known as Flows.

Stream Flows

    val picking : Flow[Order, Picked, NotUsed] = Flow[Order].map(order => {
      println(s"\npicking ${order.itemName}")
      Picked(order.itemName, order.quantity, new Date())
    })

    val packing : Flow[Picked, Packed, NotUsed] = Flow[Picked].map(picked => {
      println(s"packing ${picked.itemName}")
      Packed(picked.itemName, picked.quantity, new Date())
    })

    val labelling : Flow[Packed, Labelled, NotUsed] = Flow[Packed].map(packed => {
      println(s"labelling ${packed.itemName}")
      Shipped(packed.itemName, packed.quantity, new Date())
    })
Then, once all the steps are completed, I will send those customer orders to the trailer which will deliver the orders, so trailer is Sink here.

Stream Sink

Sink is a final destination of the items or events flowing through the stream.
    val trailer: Sink[Labelled, Future[Done]] = Sink.foreach(order => println(order.itemName + " is in the trailer, on the way to Customer."))
Finally once all the steps are defined individually, I need to assemble those into one pipeline, which is called RunnableFlow.
    val ordersFlow = customerOrdersStream
      .via(picking)
      .via(packing)
      .via(labelling)
      .to(trailer)

Materializer config

I need to define the ActorSystem and ActorMaterializer which materializes the Flows defined above into reactive Processors.
scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem

scala> implicit val actorSystem = ActorSystem("orders-streaming")
actorSystem: akka.actor.ActorSystem = akka://orders-streaming

scala> import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializer

scala> implicit val materializer = ActorMaterializer()
materializer: akka.stream.ActorMaterializer = ActorMaterializerImpl(akka://orders-streaming,ActorMaterializerSettings(4,16,,<function1>,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,1000,false,true),akka.dispatch.Dispatchers@77febdaf,Actor[akka://orders-streaming/user/StreamSupervisor-0#-1147523586],false,akka.stream.impl.SeqActorNameImpl@14deb7de)

Execute the defined stream flow

scala> ordersFlow.run()(materializer)
res1: akka.NotUsed = NotUsed

scala> 
picking DN black shirt - Triangle
packing DN black shirt - Triangle
labelling DN black shirt - Triangle
DN black shirt - Triangle is in the trailer, on the way to Customer.

picking DN white shirt - classic
packing DN white shirt - classic
labelling DN white shirt - classic
DN white shirt - classic is in the trailer, on the way to Customer.

picking DN red shirt - memento mori
packing DN red shirt - memento mori
labelling DN red shirt - memento mori
DN red shirt - memento mori is in the trailer, on the way to Customer.
As seen above, the orders are processed one at a time. In this case, say the retail has only 1 picker, 1 packer and 1 labeler.
What if the retailer has say 4 pickers, 4 packers and labelers, can I parallelize the stream, so that I can increase the throughput?

Refs