From 2793577ba126c6d0c71d8aa51e7312d22a6cd95b Mon Sep 17 00:00:00 2001 From: David Przybilla Date: Wed, 16 May 2018 11:42:58 +0900 Subject: [PATCH 1/3] adding filter expressions --- cmd/fetch.go | 6 ++++-- lib/cwreader.go | 13 +++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cmd/fetch.go b/cmd/fetch.go index dbfb599..339f4aa 100644 --- a/cmd/fetch.go +++ b/cmd/fetch.go @@ -41,6 +41,7 @@ var ( verbose bool raw bool maxStreams int + filter string ) // Error messages @@ -67,6 +68,7 @@ func init() { fetchCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose log output (includes log context in data fields)") fetchCmd.Flags().BoolVarP(&raw, "raw", "r", false, "Raw JSON output") fetchCmd.Flags().IntVarP(&maxStreams, "max-streams", "m", 100, "Maximum number of streams to fetch from (for prefix search)") + fetchCmd.Flags().StringVarP(&filter, "query", "q", "", "Filter and Pattern Syntax") } func fetch(cmd *cobra.Command, args []string) error { @@ -96,7 +98,7 @@ func fetch(cmd *cobra.Command, args []string) error { lib.SetMaxStreams(maxStreams) - logReader, err := lib.NewCloudwatchLogsReader(args[0], task, start, end) + logReader, err := lib.NewCloudwatchLogsReader(args[0], task, start, end, filter) if err != nil { return err } @@ -121,7 +123,7 @@ func fetch(cmd *cobra.Command, args []string) error { ctx, cancel := events.WithSignals(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - eventChan := logReader.StreamEvents(ctx, follow) + eventChan := logReader.StreamEvents(ctx, follow, filter) ticker := time.After(7 * time.Second) diff --git a/lib/cwreader.go b/lib/cwreader.go index 8c43f74..fd6c9d5 100644 --- a/lib/cwreader.go +++ b/lib/cwreader.go @@ -77,19 +77,20 @@ func (c *CloudwatchLogsReader) ListStreams() ([]*cloudwatchlogs.LogStream, error // given in the readers constructor. The channel will be closed once // all events are read or an error occurs. You can check for errors // after the channel is closed by calling Error() -func (c *CloudwatchLogsReader) StreamEvents(ctx context.Context, follow bool) <-chan Event { +func (c *CloudwatchLogsReader) StreamEvents(ctx context.Context, follow bool, filter string) <-chan Event { eventChan := make(chan Event) - go c.pumpEvents(ctx, eventChan, follow) + go c.pumpEvents(ctx, eventChan, follow, filter) return eventChan } -func (c *CloudwatchLogsReader) pumpEvents(ctx context.Context, eventChan chan<- Event, follow bool) { +func (c *CloudwatchLogsReader) pumpEvents(ctx context.Context, eventChan chan<- Event, follow bool, filter string) { startTime := c.start.Unix() * 1e3 params := &cloudwatchlogs.FilterLogEventsInput{ - Interleaved: aws.Bool(true), - LogGroupName: aws.String(c.logGroupName), - StartTime: aws.Int64(startTime), + Interleaved: aws.Bool(true), + LogGroupName: aws.String(c.logGroupName), + StartTime: aws.Int64(startTime), + FilterPattern: aws.String(filter), } if !follow && c.end.IsZero() { From 5b1d1af19174ebeff20de8cbcce2290d5c64490e Mon Sep 17 00:00:00 2001 From: David Przybilla Date: Wed, 16 May 2018 11:43:15 +0900 Subject: [PATCH 2/3] allow for Messages not following ecslogs.Event from segmentio --- lib/event.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/event.go b/lib/event.go index 747c9b8..57497de 100644 --- a/lib/event.go +++ b/lib/event.go @@ -36,6 +36,8 @@ func NewEvent(cwEvent cloudwatchlogs.FilteredLogEvent, group string) Event { if err := json.Unmarshal([]byte(*cwEvent.Message), &ecsLogsEvent); err != nil { ecsLogsEvent = ecslogs.MakeEvent(ecslogs.INFO, *cwEvent.Message) ecsLogsEvent.Time = ParseAWSTimestamp(cwEvent.Timestamp) + } else { + ecsLogsEvent.Message = *cwEvent.Message } return Event{ From ee089933a6a455583426006c4af1e98491de0f3f Mon Sep 17 00:00:00 2001 From: David Przybilla Date: Wed, 16 May 2018 12:05:02 +0900 Subject: [PATCH 3/3] removing not needed param --- cmd/fetch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/fetch.go b/cmd/fetch.go index 339f4aa..4076e1b 100644 --- a/cmd/fetch.go +++ b/cmd/fetch.go @@ -98,7 +98,7 @@ func fetch(cmd *cobra.Command, args []string) error { lib.SetMaxStreams(maxStreams) - logReader, err := lib.NewCloudwatchLogsReader(args[0], task, start, end, filter) + logReader, err := lib.NewCloudwatchLogsReader(args[0], task, start, end) if err != nil { return err }