Commit 5f5010cd authored by Federico Mestrone's avatar Federico Mestrone
Browse files

Added Pubsub example and added helper for Bigtable

parent 9bced830
#!/usr/bin/env bash
mvn compile exec:java -e \
-Dexec.mainClass=gcp.cm.bigdata.adtech.dataflow.StreamingPipeline \
-Dexec.args="qwiklabs-gcp-56c3e61809c73e4d \
gs://abucket-for-codemotion/dataflow/staging/ \
gs://abucket-for-codemotion/dataflow/temp/
ad-impressions"
package gcp.cm.bigdata.adtech.dataflow;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.json.JSONObject;
import javax.xml.soap.Text;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class StreamingPipeline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(args[0]);
options.setStagingLocation(args[1]);
options.setTempLocation(args[2]);
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<JSONObject> coll = p
.apply("ReadFromPubsub",
PubsubIO.readMessagesWithAttributes().fromTopic("ad-impressions"))
.apply("ToJSON", MapElements
.into(TypeDescriptor.of(JSONObject.class))
.via((PubsubMessage msg) -> new JSONObject(new String(msg.getPayload())))
)
.apply("FilterNonClick", Filter.by((JSONObject obj) -> obj.getInt("clicked") > 0))
;
coll.apply("SiteIdAndClicks", MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((JSONObject obj) -> KV.of(obj.getString("siteId"), obj.getInt("clicked")))
)
.apply("SumSiteClicks",
Sum.integersPerKey()
)
.apply("MakeSiteStrings", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Integer> kv) -> String.format("Site %s => %s", kv.getKey(), kv.getValue()))
)
.apply(TextIO.write().withoutSharding().to("gs://abucket-for-codemotion/adtech/clicks-site-%s.csv"))
;
coll.apply("DeviceAndClicks", MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
.via((JSONObject obj) -> KV.of(obj.getString("deviceModel"), obj.getInt("clicked")))
)
.apply("SumDeviceClicks",
Sum.integersPerKey()
)
.apply("MakeDeviceStrings", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Integer> kv) -> String.format("Site %s => %s", kv.getKey(), kv.getValue()))
)
.apply(TextIO.write().withoutSharding().to("gs://abucket-for-codemotion/adtech/clicks-device-%s.csv"))
;
p.run().waitUntilFinish();
}
}
......@@ -51,6 +51,11 @@
<artifactId>bigtable-hbase-1.x</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.66.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package gcp.cm.bigdata.adtech;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class BigtableHelper {
private static Connection connection;
public static Connection getConnection() {
if (connection == null) {
try {
connection = ConnectionFactory.createConnection();
} catch (IOException e) {
e.printStackTrace();
}
}
return connection;
}
public static void closeConnection() {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
......@@ -39,6 +39,8 @@ public class ObjectifyConfig {
@Override
public void contextDestroyed(ServletContextEvent sce) {
BigtableHelper.closeConnection();
PubsubHelper.shutdownPublisher();
}
}
......
package gcp.cm.bigdata.adtech;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.ProjectTopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class PubsubHelper {
private static Publisher publisher;
public static Publisher getPublisher() {
if (publisher == null) {
try {
ProjectTopicName topicName = ProjectTopicName.of("qwiklabs-gcp-56c3e61809c73e4d", "ad-impressions");
publisher = Publisher.newBuilder(topicName).build();
} catch (IOException e) {
e.printStackTrace();
}
}
return publisher;
}
public static void shutdownPublisher() {
if (publisher != null) {
try {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package gcp.cm.bigdata.adtech.controller;
import gcp.cm.bigdata.adtech.BigtableHelper;
import gcp.cm.bigdata.adtech.domain.Impression;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
......@@ -18,16 +19,6 @@ import static com.googlecode.objectify.ObjectifyService.ofy;
@CrossOrigin
public class BigtableIngestController {
private static Connection connection;
static {
try {
connection = ConnectionFactory.createConnection();
} catch (IOException e) {
e.printStackTrace();
}
}
@RequestMapping(path = "impression", method = RequestMethod.POST)
public void ingest(@RequestBody Impression entry) {
byte[] CF1 = Bytes.toBytes("I"); // column family
......@@ -37,7 +28,7 @@ public class BigtableIngestController {
byte[] CF5 = Bytes.toBytes("C"); // column family
Table table = null;
try {
table = connection.getTable(TableName.valueOf("impressions"));
table = BigtableHelper.getConnection().getTable(TableName.valueOf("impressions"));
Put p = new Put(Bytes.toBytes(String.format("%d#%d#%d#%d", entry.getSiteCategory(), entry.getAppCategory(), entry.getDeviceType(), entry.getHour())));
p.addColumn(CF1, Bytes.toBytes("ID"), Bytes.toBytes(entry.getImpressionId()));
p.addColumn(CF1, Bytes.toBytes("CLICK"), Bytes.toBytes(entry.getClicked()));
......
package gcp.cm.bigdata.adtech.controller;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import gcp.cm.bigdata.adtech.PubsubHelper;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("pubsub")
@CrossOrigin
public class PubsubIngestController {
@RequestMapping(path = "impression", method = RequestMethod.POST)
public void ingest(@RequestBody String entryJson) {
ByteString data = ByteString.copyFromUtf8(entryJson);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> future = PubsubHelper.getPublisher().publish(pubsubMessage);
ApiFutures.addCallback(future, new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("Error publishing message : " + throwable.getMessage());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println("Message published successfully : " + messageId);
}
}, MoreExecutors.directExecutor());
}
}
\ No newline at end of file
......@@ -2,7 +2,8 @@ package gcp.cm.bigdata.adtech.controller;
public enum TargetDataStore {
CLOUD_DATASTORE("datastore"),
CLOUD_BIGTABLE("bigtable");
CLOUD_BIGTABLE("bigtable"),
CLOUD_PUBSUB("pubsub");
private String pathPrefix;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment