Commit 8a52c2ae authored by Federico Mestrone's avatar Federico Mestrone
Browse files

Various fixes for streaming flow

parent 5f5010cd
......@@ -3,5 +3,5 @@ mvn compile exec:java -e \
-Dexec.mainClass=gcp.cm.bigdata.adtech.dataflow.StreamingPipeline \
-Dexec.args="qwiklabs-gcp-56c3e61809c73e4d \
gs://abucket-for-codemotion/dataflow/staging/ \
gs://abucket-for-codemotion/dataflow/temp/
gs://abucket-for-codemotion/dataflow/temp/ \
ad-impressions"
package gcp.cm.bigdata.adtech.dataflow;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.json.JSONObject;
import java.io.*;
import java.util.Map;
public class JSONObjectCoder extends AtomicCoder<JSONObject> {
@Override
public void encode(JSONObject value, OutputStream outStream) throws CoderException, IOException {
new ObjectOutputStream(outStream).writeObject(value.toMap());
}
@Override
public JSONObject decode(InputStream inStream) throws CoderException, IOException {
try {
return new JSONObject((Map)new ObjectInputStream(inStream).readObject());
} catch (ClassNotFoundException e) {
throw new CoderException(e);
}
}
}
......@@ -3,6 +3,8 @@ package gcp.cm.bigdata.adtech.dataflow;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
......@@ -11,10 +13,13 @@ import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.json.JSONObject;
import javax.xml.soap.Text;
......@@ -33,14 +38,17 @@ public class StreamingPipeline {
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
CoderRegistry cr = p.getCoderRegistry();
cr.registerCoderForClass(JSONObject.class, new JSONObjectCoder());
PCollection<JSONObject> coll = p
.apply("ReadFromPubsub",
PubsubIO.readMessagesWithAttributes().fromTopic("ad-impressions"))
PubsubIO.readMessagesWithAttributes().fromTopic("projects/" + args[0] + "/topics/ad-impressions"))
.apply("ToJSON", MapElements
.into(TypeDescriptor.of(JSONObject.class))
.via((PubsubMessage msg) -> new JSONObject(new String(msg.getPayload())))
)
.apply("FilterNonClick", Filter.by((JSONObject obj) -> obj.getInt("clicked") > 0))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
;
coll.apply("SiteIdAndClicks", MapElements
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment