| package main |
| |
| import ( |
| "bufio" |
| "encoding/base64" |
| "encoding/json" |
| "fmt" |
| "log" |
| "net" |
| "net/http" |
| "net/textproto" |
| "os" |
| "strings" |
| |
| pubsub "google.golang.org/api/pubsub/v1beta1" |
| ) |
| |
| const USAGE = `Available arguments are: |
| PROJ list_topics |
| PROJ create_topic TOPIC |
| PROJ delete_topic TOPIC |
| PROJ list_subscriptions |
| PROJ create_subscription SUBSCRIPTION LINKED_TOPIC |
| PROJ delete_subscription SUBSCRIPTION |
| PROJ connect_irc TOPIC SERVER CHANNEL |
| PROJ pull_messages SUBSCRIPTION |
| ` |
| |
| type IRCBot struct { |
| server string |
| port string |
| nick string |
| user string |
| channel string |
| conn net.Conn |
| tpReader *textproto.Reader |
| } |
| |
| func NewIRCBot(server, channel, nick string) *IRCBot { |
| return &IRCBot{ |
| server: server, |
| port: "6667", |
| nick: nick, |
| channel: channel, |
| conn: nil, |
| user: nick, |
| } |
| } |
| |
| func (bot *IRCBot) Connect() { |
| conn, err := net.Dial("tcp", bot.server+":"+bot.port) |
| if err != nil { |
| log.Fatal("unable to connect to IRC server ", err) |
| } |
| bot.conn = conn |
| log.Printf("Connected to IRC server %s (%s)\n", |
| bot.server, bot.conn.RemoteAddr()) |
| bot.tpReader = textproto.NewReader(bufio.NewReader(bot.conn)) |
| bot.Sendf("USER %s 8 * :%s\r\n", bot.nick, bot.nick) |
| bot.Sendf("NICK %s\r\n", bot.nick) |
| bot.Sendf("JOIN %s\r\n", bot.channel) |
| } |
| |
| func (bot *IRCBot) CheckConnection() { |
| for { |
| line, err := bot.ReadLine() |
| if err != nil { |
| log.Fatal("Unable to read a line during checking the connection.") |
| } |
| if strings.Contains(line, "004") { |
| log.Println("The nick accepted.") |
| } else if strings.Contains(line, "433") { |
| log.Fatal("The nick is already in use.") |
| } else if strings.Contains(line, "366") { |
| log.Println("Starting to publish messages.") |
| return |
| } |
| } |
| } |
| |
| func (bot *IRCBot) Sendf(format string, args ...interface{}) { |
| fmt.Fprintf(bot.conn, format, args...) |
| } |
| |
| func (bot *IRCBot) Close() { |
| bot.conn.Close() |
| } |
| |
| func (bot *IRCBot) ReadLine() (line string, err error) { |
| return bot.tpReader.ReadLine() |
| } |
| |
| func init() { |
| registerDemo("pubsub", pubsub.PubsubScope, pubsubMain) |
| } |
| |
| func pubsubUsage() { |
| fmt.Fprint(os.Stderr, USAGE) |
| } |
| |
| // Returns a fully qualified resource name for Cloud Pub/Sub. |
| func fqrn(res, proj, name string) string { |
| return fmt.Sprintf("/%s/%s/%s", res, proj, name) |
| } |
| |
| func fullTopicName(proj, topic string) string { |
| return fqrn("topics", proj, topic) |
| } |
| |
| func fullSubscriptionName(proj, topic string) string { |
| return fqrn("subscriptions", proj, topic) |
| } |
| |
| // Check the length of the arguments. |
| func checkArgs(argv []string, min int) { |
| if len(argv) < min { |
| pubsubUsage() |
| os.Exit(2) |
| } |
| } |
| |
| func listTopics(service *pubsub.Service, argv []string) { |
| var nextPageToken string = "" |
| for { |
| query := service.Topics.List().Query( |
| fmt.Sprintf( |
| "cloud.googleapis.com/project in (/projects/%s)", |
| argv[0])).PageToken(nextPageToken) |
| topicsList, err := query.Do() |
| if err != nil { |
| log.Fatal("Got an error: %v", err) |
| } |
| for _, topic := range topicsList.Topic { |
| fmt.Println(topic.Name) |
| } |
| nextPageToken = topicsList.NextPageToken |
| if nextPageToken == "" { |
| break |
| } |
| } |
| } |
| |
| func createTopic(service *pubsub.Service, argv []string) { |
| checkArgs(argv, 3) |
| topic := &pubsub.Topic{Name: fullTopicName(argv[0], argv[2])} |
| topic, err := service.Topics.Create(topic).Do() |
| if err != nil { |
| log.Fatal("Got an error: %v", err) |
| } |
| fmt.Printf("Topic %s was created.\n", topic.Name) |
| } |
| |
| func deleteTopic(service *pubsub.Service, argv []string) { |
| checkArgs(argv, 3) |
| topicName := fullTopicName(argv[0], argv[2]) |
| err := service.Topics.Delete(topicName).Do() |
| if err != nil { |
| log.Fatal("Got an error: %v", err) |
| } |
| fmt.Printf("Topic %s was deleted.\n", topicName) |
| } |
| |
| func listSubscriptions(service *pubsub.Service, argv []string) { |
| var nextPageToken string = "" |
| for { |
| query := service.Subscriptions.List().Query( |
| fmt.Sprintf( |
| "cloud.googleapis.com/project in (/projects/%s)", |
| argv[0])).PageToken(nextPageToken) |
| subscriptionsList, err := query.Do() |
| if err != nil { |
| log.Fatal("Got an error: %v", err) |
| } |
| for _, subscription := range subscriptionsList.Subscription { |
| sub_text, _ := json.MarshalIndent(subscription, "", " ") |
| fmt.Printf("%s\n", sub_text) |
| } |
| nextPageToken = subscriptionsList.NextPageToken |
| if nextPageToken == "" { |
| break |
| } |
| } |
| } |
| |
| func createSubscription(service *pubsub.Service, argv []string) { |
| checkArgs(argv, 4) |
| subscription := &pubsub.Subscription{ |
| Name: fullSubscriptionName(argv[0], argv[2]), |
| Topic: fullTopicName(argv[0], argv[3]), |
| } |
| subscription, err := service.Subscriptions.Create(subscription).Do() |
| if err != nil { |
| log.Fatal("Got an error: %v", err) |
| } |
| fmt.Printf("Subscription %s was created.\n", subscription.Name) |
| } |
| |
| func deleteSubscription(service *pubsub.Service, argv []string) { |
| checkArgs(argv, 3) |
| subscriptionName := fullSubscriptionName(argv[0], argv[2]) |
| err := service.Subscriptions.Delete(subscriptionName).Do() |
| if err != nil { |
| log.Fatal("Got an error: %v", err) |
| } |
| fmt.Printf("Subscription %s was deleted.\n", subscriptionName) |
| } |
| |
| func connectIRC(service *pubsub.Service, argv []string) { |
| checkArgs(argv, 5) |
| topicName := fullTopicName(argv[0], argv[2]) |
| server := argv[3] |
| channel := argv[4] |
| nick := fmt.Sprintf("bot-%s", argv[2]) |
| ircbot := NewIRCBot(server, channel, nick) |
| ircbot.Connect() |
| defer ircbot.Close() |
| ircbot.CheckConnection() |
| privMark := fmt.Sprintf("PRIVMSG %s :", ircbot.channel) |
| for { |
| line, err := ircbot.ReadLine() |
| if err != nil { |
| log.Fatal("Unable to read a line from the connection.") |
| } |
| parts := strings.Split(line, " ") |
| if len(parts) > 0 && parts[0] == "PING" { |
| ircbot.Sendf("PONG %s\r\n", parts[1]) |
| } else { |
| pos := strings.Index(line, privMark) |
| if pos == -1 { |
| continue |
| } |
| privMsg := line[pos+len(privMark) : len(line)] |
| pubsubMessage := &pubsub.PubsubMessage{ |
| Data: base64.StdEncoding.EncodeToString([]byte(privMsg)), |
| } |
| publishRequest := &pubsub.PublishRequest{ |
| Message: pubsubMessage, |
| Topic: topicName, |
| } |
| service.Topics.Publish(publishRequest).Do() |
| log.Println("Published a message to the topic.") |
| } |
| } |
| } |
| |
| func pullMessages(service *pubsub.Service, argv []string) { |
| checkArgs(argv, 3) |
| subscriptionName := fullSubscriptionName(argv[0], argv[2]) |
| pullRequest := &pubsub.PullRequest{ |
| ReturnImmediately: false, |
| Subscription: subscriptionName, |
| } |
| for { |
| pullResponse, err := service.Subscriptions.Pull(pullRequest).Do() |
| if err != nil { |
| log.Fatal("Got an error while pull a message: %v", err) |
| } |
| if pullResponse.PubsubEvent.Message != nil { |
| data, err := base64.StdEncoding.DecodeString( |
| pullResponse.PubsubEvent.Message.Data) |
| if err != nil { |
| log.Fatal("Got an error while decoding the message: %v", err) |
| } |
| fmt.Printf("%s\n", data) |
| ackRequest := &pubsub.AcknowledgeRequest{ |
| AckId: []string{pullResponse.AckId}, |
| Subscription: subscriptionName, |
| } |
| service.Subscriptions.Acknowledge(ackRequest).Do() |
| } |
| } |
| } |
| |
| // This example demonstrates calling the Cloud Pub/Sub API. As of 20 |
| // Aug 2014, the Cloud Pub/Sub API is only available if you're |
| // whitelisted. If you're interested in using it, please apply for the |
| // Limited Preview program at the following form: |
| // http://goo.gl/Wql9HL |
| // |
| // Also, before running this example, be sure to enable Cloud Pub/Sub |
| // service on your project in Developer Console at: |
| // https://console.developers.google.com/ |
| // |
| // It has 8 subcommands as follows: |
| // |
| // PROJ list_topics |
| // PROJ create_topic TOPIC |
| // PROJ delete_topic TOPIC |
| // PROJ list_subscriptions |
| // PROJ create_subscription SUBSCRIPTION LINKED_TOPIC |
| // PROJ delete_subscription SUBSCRIPTION |
| // PROJ connect_irc TOPIC SERVER CHANNEL |
| // PROJ pull_messages SUBSCRIPTION |
| // |
| // You can use either of your alphanumerical or numerial Cloud Project |
| // ID for PROJ. You can choose any names for TOPIC and SUBSCRIPTION as |
| // long as they follow the naming rule described at: |
| // https://developers.google.com/pubsub/overview#names |
| // |
| // You can list/create/delete topics/subscriptions by self-explanatory |
| // subcommands, as well as connect to an IRC channel and publish |
| // messages from the IRC channel to a specified Cloud Pub/Sub topic by |
| // the "connect_irc" subcommand, or continuously pull messages from a |
| // specified Cloud Pub/Sub subscription and display the data by the |
| // "pull_messages" subcommand. |
| func pubsubMain(client *http.Client, argv []string) { |
| checkArgs(argv, 2) |
| service, _ := pubsub.New(client) |
| m := map[string]func(service *pubsub.Service, argv []string){ |
| "list_topics": listTopics, |
| "list_subscriptions": listSubscriptions, |
| "create_topic": createTopic, |
| "delete_topic": deleteTopic, |
| "create_subscription": createSubscription, |
| "delete_subscription": deleteSubscription, |
| "connect_irc": connectIRC, |
| "pull_messages": pullMessages, |
| } |
| f, ok := m[argv[1]] |
| if !ok { |
| pubsubUsage() |
| os.Exit(2) |
| } |
| f(service, argv) |
| } |