Cassandra 테이블을 스파크 데이터와 join 하고 싶을때 


https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md


datastax , 쉽게 생각하면 카산드라 벤더에서 만든 스파크 드라이버를 이용 


val createDDL = """CREATE TEMPORARY VIEW/TABLE words USING org.apache.spark.sql.cassandra OPTIONS ( table "words", keyspace "test", cluster "Test Cluster", pushdown "true")""" spark.sql(createDDL) // Creates Catalog Entry registering an existing Cassandra Table

View 또는 TABLE로 맵핑해서 SparkSQL로 사용




저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


아파츠 재플린 , apache Zepplin아파츠 재플린 , apache Zepplin




몰랐는데, 재플린에서도 Code Assistant 기능이 있었네요..


사용법


sc를 치고  CTRL 키를 누른상태에서 . 을 누르면 어시스턴스 기능이 나옵니다. 


아파츠 재플린 , apache Zepplin아파츠 재플린 , apache Zepplin



하 이렇게 좋은걸 모르고 있었다니... 



저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

최근 클러스터 환경을  HDP(Hortonworks Data Platform) 2.5.3.0 -> HDP 2.6.1.0으로 올리면서


Spark 환경을 1.6 -> 2.1 로 바꾸었습니다.



사실 스칼라도 해보겠다고 두꺼운 Programming in Scala 3판도 샀지만..... 이미 초심은..


그러던 도중 ... 


분명히 Zepplien 에서 1.6에서 Parquet 파일을 Table로 저장할때, 


sqlContext.parquetFile("/tmp/extracted").sveAsTable("step01");


로 했던것 같은데.. Spark를 2버전으로 바꾸니 saveAsTable is not a member of org.apache.spark.sql.DataFrame


다음과 같은 오류가 나네요 .. 아마 내부 API나 어떤 변경이 있겠지만.. 역시 구글신 


sqlContext.parquetFile("/tmp/extracted").write.format("parquet").saveAsTable("step01")


다음으로 변경하니 , 제대로 저장이 되네요.







저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

Spark에서 paruqet 압축 알고리즘을 찾다가. 

분명히 두가지 방법중 한가지 방법이면 된다고 하는것 같은데


sqlContext.setConf("spark.sql.psqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
sqlContext.sql("SET spark.sql.parquet.compression.codec=snappy")


저는 이것이 동작하네요 
sqlContext.sql("SET spark.sql.parquet.compression.codec=snappy")

사용하는 화경은 HDP 2.5 Spark 1.6 입니다


저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License



Parquet + Spark 조합을 사용하고 있습니다


SparkSQL로 처리하기가 애매한 상황이라.  

직접 코딩좀 할일이 있어서 Scala를 만지는데 ...


1. sbt에 잘몰라서 설정하는데 고생

2. sbt와 스칼라버전과 build.sbt 의 %,%% 차이를 몰라서 고생

3. 책을 보고 있는데, 자바하고 문법이 비슷한것 같은데

   이제는 파이썬에 너무 익숙해져서 스칼라가 눈에 잘 안들어오는...



저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
맨날 HDP 에 설치가 잘된 제플린을 사용하다보니,,

수동으로 제플린을 사용하려고 하니 HIVE를 사용하려고 하니 다음과 같은 오류가 발생합니다.

Prefix not found.

paragraph_1493986135331_752263516's Interpreter hive not found
org.apache.zeppelin.interpreter.InterpreterException: paragraph_1493986135331_752263516's Interpreter hive not found at org.apache.zeppelin.notebook.Note.run(Note.java:605) at org.apache.zeppelin.socket.NotebookServer.persistAndExecuteSingleParagraph(NotebookServer.java:1641) at org.apache.zeppelin.socket.NotebookServer.runAllParagraphs(NotebookServer.java:1588) at org.apache.zeppelin.socket.NotebookServer.onMessage(NotebookServer.java:268) at org.apache.zeppelin.socket.NotebookSocket.onWebSocketText(NotebookSocket.java:59) at org.eclipse.jetty.websocket.common.events.JettyListenerEventDriver.onTextMessage(JettyListenerEventDriver.java:128) at org.eclipse.jetty.websocket.common.message.SimpleTextMessage.messageComplete(SimpleTextMessage.java:69) at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.appendMessage(AbstractEventDriver.java:65) at org.eclipse.jetty.websocket.common.events.JettyListenerEventDriver.onTextFrame(JettyListenerEventDriver.java:122) at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:161) at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:309) at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:214) at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:220) at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:258) at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.readParse(AbstractWebSocketConnection.java:632) at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:480) at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) at java.lang.Thread.run(Thread.java:745)


제가 사용한 제플린 버전은 1.7.1 버전이고 이유는 간단합니다.

HIVE 인터프리터가 없기 때문입니다.


설정방법은 여기 친절하게 나와있습니다.


https://zeppelin.apache.org/docs/0.7.1/interpreter/hive.html



설명은 간단하지만,, 분명히 HDP에서 제플린으로 HIVE사용할때는 %hive 사용했던것 같은데..


제플린을 오래 사용하지 않아서 jdbc로 통합된것 같더군요


먼저 Hadoop 홈폴더에 보면 share 폴더 아래 hadoop 아래 common이라는 폴더가 있을겁니다.

여기서 hadoop-common을 제플린 폴더의 interpreter 폴더 jdbc 폴더 안에 복사합니다


그다음 hive 홈폴더에서 lib 폴더에 보면 hive-jdbc-standalone이라는 jar 파일이 있습니다 

마찬가지로 제플린 interpreter 폴더 jdbc 에 복사합니다


그다음 제플린 UI에서 인터프리터를 여신다음 여기서 jdbc를 검색합니다

그다음 jdbc에 다음을 추가합니다

hive.driver   = org.apache.hive.jdbc.HiveDriver

hive.password = 자신의 설정

hive.url =자신의 설정

hive.user =자신의 설정


그다음 제플린 데몬을 제시작하면


%hive(jdbc) 하신다음 바로 사용이 가능합니다.





....추가적으로 지금 제플린에 yarn-client 모드로 spark 엮고 있는데, 이거 제대로 되면 방법도 올려보겠습니다.


맨날 벤더 하둡 쓰려다가 ㅠㅠㅠ self-deployed 하둡 쓰려니까 어렵네요 ㅠㅠ 























저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License


twitter scrooge 를 


spark scala 코드를 만지고 있는데, 다음과 같은 오류가 발생한다면


build.sbt에 아래를 추가하시면 될것 같습니다.


resolvers ++= Seq(
"Twitter Maven Repo" at "http://maven.twttr.com"
)


잘은 모르겟지만 libthrift 저게 maven centeral repo에는 업다고 하네요 



저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

잠깐동안 예제를 따라해보면서 해본 느낌은.. 


잘 모르겠지만 엄청 간단합니다. 


먼저 Parallel Python에서 http://www.parallelpython.com/content/view/18/32/ 에서 다운 받아서, pp를 다운받아서, 돌아갈 머신과 마스터 노드에 python setup.py install 하면 끝..


그리고 사용방법은 계산노드(slave)에서 ppserver.py -a(auto discovery) 하면 끝.. 물론 포트를 지정해 준다면 -p 옵션을 사용하면됩니다. 


그런다음 마스터 노드에서 다음과 같은 방법으로 하면됩니다.  (지금 같은경우는 1master node, 1slave node로 구성된 케이스)



import sys,thread
import pp
class myTest:
def __init__(self):
self.value=0
self.lock = thread.allocate_lock()

def result(self, value):
self.lock.acquire()
self.value += value
self.lock.release()


def mysum(n):
result=0
for i in range(1,1000):
result+=i
return result



sum = myTest()

ppserver= ppservers=("*",)

job_server = pp.Server(ppservers=ppservers)
job1 = job_server.submit(mysum,(100,),callback=sum.result)

job_server.print_stats()

job_server.wait()

print "result is "+str(sum.value)

job_server.print_stats()

#ppservers = ("10.0.0.1",)


slave서버를 입력 하는 방법인데 ppserver("*",) 이 방법은 Master노드와 같은 대역에 있을대 저렇게 하면, 자동으로 인식이됨 저 코드의 경우 1master node에서 1 slave노드에게 계산하라고 해서 callback 한 결과를 보여주고 있습니다. 


Job execution statistics:

 job count | % of all jobs | job time sum | time per job | job server

         1 |        100.00 |       0.0000 |     0.000000 | local

Time elapsed since server creation 0.00300002098083

1 active tasks, 8 cores


result is 499500

Job execution statistics:

 job count | % of all jobs | job time sum | time per job | job server

         1 |        100.00 |       0.0010 |     0.001000 | local

Time elapsed since server creation 0.00399994850159

0 active tasks, 8 cores



몇가지 궁금한게 생겼는데,, 결국에는 RabiitMq같은 Queue는 필요없을것 같은데,, 다중 노드는 어떻게 명령을 내려야되는거며, 어떻게 중간에 합치는게 따로 있나.  그리고 어짜피 쓰레드는 길(GIL)이 걸릴테니, 멀티프로세싱으로 하고 싶은데,, 이것도 되려나.. (답은 간단하더군요)

결론은 파이썬 신기합니다. . 

추가:: 방금전에 해본결과로는 결국에는 작동하는 로컬밖에 나오지 않았습니다. 
사실 몇가지 실험을 해보았는데, 결론은 다음과 같았다  이번에는 모든 10노드를 전부 기동시키고 
연산작업을... 한 1000개를 걸어보았습니다. 

Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
        26 |         16.15 |      28.5401 |     1.097695 | 192.168.1.203:60000
        28 |         17.39 |      26.5884 |     0.949585 | 192.168.1.206:60000
         8 |          4.97 |       5.8869 |     0.735865 | 192.168.1.204:60000
         8 |          4.97 |       4.0038 |     0.500477 | 192.168.1.100:60000
         6 |          3.73 |       4.7484 |     0.791398 | 192.168.1.201:60000
        13 |          8.07 |      13.5061 |     1.038933 | 192.168.1.207:60000
        10 |          6.21 |      10.0583 |     1.005829 | 192.168.1.209:60000
        26 |         16.15 |      77.0360 |     2.962923 | local
        36 |         22.36 |      35.4551 |     0.984865 | 192.168.1.208:60000


결과는 다음과 같은데 10노드가 전부 활동하지는 않습니다... 예상에는 알아서 스케쥴링을 하거나, 현재의 상황을 찍었을때 찰나에는 다른 노드에서 일을하지 않고 있거나 그런것 같습니다. 

이 결과를 찾기 위해 이짓 저짓을 많이 해봤는데, 

어쩌면 이 당연한 사실을 찾기위해 몇시간을 순간 날렸네요.

결국은 파이썬이 신기하네요. 하지만 전 그래도 Java가 좋아요,,

스칼라도 공부해야되는데..





저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License

몇가지 처리해야할 작업이 있습니다. 


몇 가지 케이스에 대해 계속 테스트를 해보고 있지만. 이걸 Hadoop MapReduce로 처리하는것은 정말 성능이 안나오더군요. 흔히 말한는 반복적인 작업... 이걸 Storm, 또는 Spark를 통해 해결해 보고 싶지만. 현재 사정상 신규아키텍처를 도입하는데 문제가 있어서... 


물론,,, 현재 환경은 HDP(Hortonworks Data Platform)2.3 이기 때문에, 설치하거나 실행하는데, 문제는 아닙니다. Storm 같은경우 Topology를 만들면 되겠지만.. 약간 제가 생각하는 작업에는 불리할것 같고.. Spark쪽은 아직 제가 지식이 부족해서 시간대비 성과가 부족할것 같은 생각 때문입니다.


물론, 전 아직까지는 언어중에 Java가 좋지만, 요즘 왠만한 귀찮은 작업들은 Python으로 하고 있었는데(사실 과거에는 Perl도 배워보고 싶엇지만..) 너무 강력해서 계속 쓰게 되네요. 솔찍히 Java로 10줄 만들걸 파이선으로 3줄이면 표현이 되거든요..


사실 이렇게 하다가 고민이 생겼습니다. 다중 노드(현재 저 같은 경우는 10노드 + 추가 2노드) Produce Consumer 패턴?으로 로 작업을 병렬로 처리해보고 싶었는데, 이걸 프로그램으로 하나하나 만들기도 모하고,, 이러저런 고민이 많았는데,,


파이선이 해결해 주었네요 http://www.parallelpython.com/ 몇가지 고민은 Master Node에서 Slave 노드로 작업을 어떻게 넘겨주느냐 여부인데,, 자체적인 Queue가 있는것인지, 아님 외부 Queue를 이용해야하는지 봐봐야겠네요..


지금 RabbitMQ와 Kafaka를 동시에 보고 있는데,,, 제가 제대로 알고 있다면 Kafakfa는 Consumer에게 전달된 메세지가 어떤것인지에 대한 보장??꼭 UDP같은 성격이라... 의 문제가 있어서... 텍스트들을 보다 보면,, 케이스에 따라서는 RabbitMQ를 추천하시더군요 


일단은 이번포스팅의 목적은 파이선에서 병렬 처리를 쉽게 해주는 방법이 있는데,, 그것은 parallel python 입니다.


파이썬 짱짱,, 

저작자 표시 비영리 변경 금지
신고
크리에이티브 커먼즈 라이선스
Creative Commons License
1

+ Recent posts

티스토리 툴바