Sunday, 4 December 2016

create a OS managed service using systemd

  1. UNIX System V OS/ BSD OS init systems,
  2. upstart, 2006, an event-based replacement for the traditional init daemon upstart.
    The recent version of ubuntu uses systemd, as far as I remember previous versions of ubuntu use upstart.
  3. runit, 2004
  4. launchd in MacOS world.
PS. There's lot of hate against systemd, it being against Unix philosophy that goes as;
Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.
Here's a thread which might be useful to dig.

Fun time

I'm creating a streaming service in this example, which will simply write an event to a file continuously. And, if I restart my machine the service will start emitting events to a file right away.
Also I can start or stop service using systemctl start/stop serviceName. I used to use nohup <command> & and to kill it I had to use get the PID and then kill it. I can get rid of all that crap with systemD service.
STEP 1 - create streaming.service at /etc/systemd/system/multi-user.target.wants/streaming.service, also at /etc/systemd/system/streaming.service.
[Unit]                                                                                                                                           
Description=Streaming pipeline                                                                      

[Service] 
Type=forking                                                                         
ExecStart=/usr/local/bin/streaming.sh
TimeoutSec=infinity
Restart=always

[Install]
WantedBy=multi-use.target
STEP 2 - create a bash script at /usr/local/bin/streaming.sh and give current user permission to it. Don't forget #!/bin/bash for the bash scripts.
#!/bin/bash
while true
 do
    echo "Streaming an event at `date`" >> /var/log/streaming.log
 done
chmod 777 /usr/local/bin/streaming.sh
STEP 3 - reload systemd daemon so that it loads streaming.service and enable the service
systemctl daemon-reload
systemctl enable streaming.service
STEP 4 - see the streaming job status
systemctl status streaming.service
I can use journalctl _PID=?? to debug if the service is not working. eg.
journalctl _PID=5871
-- Logs begin at Mon 2016-11-21 16:36:41 PST, end at Tue 2016-12-06 14:34:56 PST. --
Nov 26 04:17:52 y0319t10971 sshd[5871]: Connection closed by 10.16.132.191 [preauth]
Dec 06 14:33:15 y0319t10971 systemd[5871]: Failed at step EXEC spawning /usr/local/bin/streaming.sh: Exec format error
or also can use following command,
root@y0319t10971:~# journalctl -fu streaming.service
-- Logs begin at Mon 2016-11-21 16:36:41 PST. --
Dec 06 14:33:16 y0319t10971 systemd[1]: streaming.service: main process exited, code=exited, status=203/EXEC
Dec 06 14:33:16 y0319t10971 systemd[1]: Unit streaming.service entered failed state.
Dec 06 14:33:16 y0319t10971 systemd[1]: streaming.service failed.
Dec 06 14:34:28 y0319t10971 systemd[1]: Started Streaming pipeline.
Dec 06 14:34:28 y0319t10971 systemd[1]: Starting Streaming pipeline...
Dec 06 14:34:28 y0319t10971 systemd[1]: streaming.service: main process exited, code=exited, status=203/EXEC
Dec 06 14:34:28 y0319t10971 systemd[1]: Unit streaming.service entered failed state.
Dec 06 14:34:28 y0319t10971 systemd[1]: streaming.service failed.
Dec 06 14:44:58 y0319t10971 systemd[1]: Started Streaming pipeline.
Dec 06 14:44:58 y0319t10971 systemd[1]: Starting Streaming pipeline...
STEP 5 - see the events produced by the streaming.service
tail -f /var/log/streaming.log 
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
Streaming an event at Fri Dec  2 11:27:21 PST 2016
PS. Dont forget to stop the service, otherwise your Storage will be full in few days :)
ll /var/log/streaming.log --block-size=GB
-rw-r--r-- 1 root root 3GB Dec  6 13:51 /var/log/streaming.log
Also, the service would be running right after I reboot the machine.

Sunday, 27 November 2016

Playing with reactive streams with akka

Recently I'm learning Akka streaming for fun. Streams are basically Collections of data/events with unknown ending. So stream programming is useful if I want to process huge data set which will keep producing events over time. And, whenever I see any new event on the stream I will do the further processing on that event.
For example, the streams of online orders on any retail store. Customers will keep ordering, its a stream and once the store person sees the new order he looks for the items ordered and and picks them up.
Once he gathers all the items for one order, sends to other guy who will pack them into a box and tell other guy to put label on them. Which means there is stream of(orders ready to pack) and then there's stream of(orders ready to be labelled and shipped).
Finally all the labelled orders are put on the truck. So the truck or trailer is the final Sink here.
To visualize it,
online orders -> pick items -> pack order -> put label -> trailer -> | Done
Let's achieve it using akka streaming,
First, I will define the Order states,
case class Order(val itemName: String, val quantity: Int, val at: Date)
case class Picked(val itemName: String, val quantity: Int, val at: Date)
case class Packed(val itemName: String, val quantity: Int, val at: Date)
case class Labelled(val itemName: String, val quantity: Int, val at: Date)

Stream Source

Source is the Collection of elements which it keeps publishing/emitting.
scala> import akka.stream.scaladsl.{Source, Sink}

scala> val customerOrdersStream = Source(List(Order("DN black shirt - Triangle", 1, new Date()),
     |       Order("DN white shirt - classic", 2, new Date()),
     |       Order("DN red shirt - memento mori", 3, new Date())))
customerOrdersStream: akka.stream.scaladsl.Source[Order,akka.NotUsed] =
Source(SourceShape(StatefulMapConcat.out), CompositeModule [6611a9b6]
  Name: iterableSource
  Modules:
    (singleSource) GraphStage(SingleSource(List(Order(DN black shirt - Triangle,1,Sun Nov 27 13:05:38 PST 2016), Order(DN white shirt - classic,2,Sun Nov 27 13:05:38 PST 2016), Order(DN red shirt - memento mori,3,Sun Nov 27 13:05:38 PST 2016)))) [415e4363]
    (unnamed) [2cc00ee7] copy of GraphStage(StatefulMapConcat) [3b270282]
  Downstreams:
    single.out -> StatefulMapConcat.in
  Upstreams:
    StatefulMapConcat.in -> single.out
  MatValue: Atomic(singleSource[415e4363]))
Once Source is defined, I need to define the processing steps it has to go through, which are in this example pickingpackinglabelling. And these are known as Flows.

Stream Flows

    val picking : Flow[Order, Picked, NotUsed] = Flow[Order].map(order => {
      println(s"\npicking ${order.itemName}")
      Picked(order.itemName, order.quantity, new Date())
    })

    val packing : Flow[Picked, Packed, NotUsed] = Flow[Picked].map(picked => {
      println(s"packing ${picked.itemName}")
      Packed(picked.itemName, picked.quantity, new Date())
    })

    val labelling : Flow[Packed, Labelled, NotUsed] = Flow[Packed].map(packed => {
      println(s"labelling ${packed.itemName}")
      Shipped(packed.itemName, packed.quantity, new Date())
    })
Then, once all the steps are completed, I will send those customer orders to the trailer which will deliver the orders, so trailer is Sink here.

Stream Sink

Sink is a final destination of the items or events flowing through the stream.
    val trailer: Sink[Labelled, Future[Done]] = Sink.foreach(order => println(order.itemName + " is in the trailer, on the way to Customer."))
Finally once all the steps are defined individually, I need to assemble those into one pipeline, which is called RunnableFlow.
    val ordersFlow = customerOrdersStream
      .via(picking)
      .via(packing)
      .via(labelling)
      .to(trailer)

Materializer config

I need to define the ActorSystem and ActorMaterializer which materializes the Flows defined above into reactive Processors.
scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem

scala> implicit val actorSystem = ActorSystem("orders-streaming")
actorSystem: akka.actor.ActorSystem = akka://orders-streaming

scala> import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializer

scala> implicit val materializer = ActorMaterializer()
materializer: akka.stream.ActorMaterializer = ActorMaterializerImpl(akka://orders-streaming,ActorMaterializerSettings(4,16,,<function1>,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,1000,false,true),akka.dispatch.Dispatchers@77febdaf,Actor[akka://orders-streaming/user/StreamSupervisor-0#-1147523586],false,akka.stream.impl.SeqActorNameImpl@14deb7de)

Execute the defined stream flow

scala> ordersFlow.run()(materializer)
res1: akka.NotUsed = NotUsed

scala> 
picking DN black shirt - Triangle
packing DN black shirt - Triangle
labelling DN black shirt - Triangle
DN black shirt - Triangle is in the trailer, on the way to Customer.

picking DN white shirt - classic
packing DN white shirt - classic
labelling DN white shirt - classic
DN white shirt - classic is in the trailer, on the way to Customer.

picking DN red shirt - memento mori
packing DN red shirt - memento mori
labelling DN red shirt - memento mori
DN red shirt - memento mori is in the trailer, on the way to Customer.
As seen above, the orders are processed one at a time. In this case, say the retail has only 1 picker, 1 packer and 1 labeler.
What if the retailer has say 4 pickers, 4 packers and labelers, can I parallelize the stream, so that I can increase the throughput?

Refs

Saturday, 12 November 2016

playing with regex with java regex api



has pattern anything followed by timeMillis followed by anything
// simple event

scala> val event = "{\"timeMillis\": 123456}"
event: String = {"timeMillis": 123456}

scala> import java.util.regex.Pattern
import java.util.regex.Pattern

// has pattern {"timeMillis" : number}
scala> Pattern.matches("\\{\"timeMillis\": [0-9]*\\}", event)
res3: Boolean = true

// has pattern anything followed by timeMillis followed by anything
scala> Pattern.matches(".*timeMillis.*", event)
res4: Boolean = true
has pattern anything followed by "timeMillis" followed by anything
//complex event
scala> val event = "{\"timeMillis\": 123456, \"body\":\"some body\"}"
event: String = {"timeMillis": 123456, "body":"some body"}

// has pattern anything followed by "timeMillis" followed by anything
scala> Pattern.matches(".*\"timeMillis\".*", event)
res5: Boolean = true
has pattern {white space followed by "timeMillis" followed by anything
scala> val event = "{ \"timeMillis\": 123456, \"body\":\"some body\"}"
event: String = { "timeMillis": 123456, "body":"some body"}

scala> Pattern.matches("\\{\\s*\"timeMillis\"", event)
res8: Boolean = false

scala> Pattern.matches("\\{\\s*\"timeMillis\".*", event)
res9: Boolean = true



Monday, 7 November 2016

create compact log event each in separate line with log4j2 JSONLayout


I am recently working on shipping the application logs to elasticsearch database, so that mainly the http requests traversal through multiple services could be tracked down based on their requestId.

I am using flume agent to read the application log, read line by line modify few json elements and publish it to the elasticsearch database.

I first updated my application log to be in json format as flume easily recognizes json events. I am using log4j2 for that.

BUT, found out I can not read the multiple line json object from flume,

{
  "timeMillis" : 1474611652491,
  "thread" : "main",
  "level" : "DEBUG",
  "loggerName" : "suppliesLogger",
  "message" : "I'm Hunter Thomson and I'm alive.",
  "endOfBatch" : false,
  "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
  "threadId" : 1,
  "threadPriority" : 5
}

So, instead of writing stupid logic to read multiline json object on flume side, I updated my log4j2 config to write each log in one line in my application itself.

The config is as below,
{
  "configuration": {
    "name": "logggg",
    "packages" : "org.apache.logging",
    "appenders": {
      "RollingFile": {
        "name":"rollingStone",
        "fileName":"supply_chain_rolled.log",
        "filePattern":"%d{MM-dd-yy-HH-mm-ss}-%i.log.gz",
        "JSONLayout": {
          "complete" : false,
          "compact" : true,
          "eventEol" : true
        },
        "Policies": {
          "SizeBasedTriggeringPolicy": {
            "size":"10 MB"
          }
        },
        "DefaultRolloverStrategy": {
          "max":"10"
        }
      }
    },
    "loggers": {
      "root": {
        "level":"debug",
        "appender-ref": {
          "ref":"rollingStone"
        }
      }
    }
  }
}


I basically needed to make the json object to be compact with compact:true. But that will write all the events in one fking line.

So had to add EOL after each event with eventEol : true.

The application after compacting it with EOL is
{"timeMillis":1478588550167,"thread":"main","level":"DEBUG","loggerName":"org.apache.logging.SupplyChainLogger","message":"I'm Hunter Thomson","endOfBatch":false,"loggerFqcn":"org.apache.logging.log4j.spi.AbstractLogger","threadId":1,"threadPriority":5}
{"timeMillis":1478588550569,"thread":"main","level":"DEBUG","loggerName":"org.apache.logging.SupplyChainLogger","message":"artist=porcupine tree,address=UK","endOfBatch":false,"loggerFqcn":"org.apache.logging.log4j.spi.AbstractLogger","threadId":1,"threadPriority":5}
{"timeMillis":1478588550571,"thread":"main","level":"DEBUG","loggerName":"org.apache.logging.SupplyChainLogger","message":"Exception occured ","thrown":{"commonElementCount":0,"localizedMessage":"some exception","message":"some exception","name":"java.lang.Exception","extendedStackTrace":[{"class":"org.apache.logging.SupplyChainLogger","method":"main","file":"SupplyChainLogger.java","line":17,"exact":true,"location":"classes/","version":"?"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke0","file":"NativeMethodAccessorImpl.java","line":-2,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke","file":"NativeMethodAccessorImpl.java","line":62,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"sun.reflect.DelegatingMethodAccessorImpl","method":"invoke","file":"DelegatingMethodAccessorImpl.java","line":43,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"java.lang.reflect.Method","method":"invoke","file":"Method.java","line":498,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"com.intellij.rt.execution.application.AppMain","method":"main","file":"AppMain.java","line":147,"exact":true,"location":"idea_rt.jar","version":"?"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.log4j.spi.AbstractLogger","threadId":1,"threadPriority":5}


Resource
-----------------------

http://logging.apache.org/log4j/2.0/log4j-core/apidocs/org/apache/logging/log4j/core/layout/JsonLayout.html