Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for logs_integration_pipeline #170

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ the resources on the destination, and saves locally what has been pushed.
- **dashboards**
- **dashboard_lists**
- **logs_custom_pipelines**
- **logs_integration_pipelines**
- **notebooks**
- **host_tags**
- **logs_indexes**
Expand Down Expand Up @@ -222,6 +223,7 @@ See [Supported resources](#supported-resources) section below for potential reso
| dashboards | monitors, roles, service_level_objectives |
| dashboard_lists | dashboards |
| logs_custom_pipelines | - |
| logs_integration_pipelines | - |
| notebooks | - |
| host_tags | - |
| logs_indexes | - |
Expand Down
91 changes: 91 additions & 0 deletions datadog_sync/model/logs_integration_pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Unless explicitly stated otherwise all files in this repository are licensed
# under the 3-clause BSD style license (see LICENSE).
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, List, Dict, cast

from datadog_sync.utils.base_resource import BaseResource, ResourceConfig

if TYPE_CHECKING:
from datadog_sync.utils.custom_client import CustomClient


class LogsIntegrationPipelines(BaseResource):
resource_type = "logs_integration_pipelines"
resource_config = ResourceConfig(
concurrent=False,
base_path="/api/v1/logs/config/pipelines",
excluded_attributes=["id", "is_read_only", "type", "processors", "filter"],
)
# Additional LogsIntegrationPipelines specific attributes
destination_integration_pipelines: Dict[str, Dict] = dict()

def get_resources(self, client: CustomClient) -> List[Dict]:
resp = client.get(self.resource_config.base_path).json()

return resp

def import_resource(self, _id: Optional[str] = None, resource: Optional[Dict] = None) -> None:
if _id:
source_client = self.config.source_client
resource = source_client.get(self.resource_config.base_path + f"/{_id}").json()

resource = cast(dict, resource)
if not resource["is_read_only"]:
return

# Normalize name for the integration pipeline
resource["name"] = resource["name"].lower()

self.resource_config.source_resources[resource["id"]] = resource

def pre_resource_action_hook(self, _id, resource: Dict) -> None:
pass

def pre_apply_hook(self) -> None:
self.destination_integration_pipelines = self.get_destination_integration_pipelines()

def create_resource(self, _id: str, resource: Dict) -> None:
if resource["name"] not in self.destination_integration_pipelines:
raise Exception(
"resource cannot be created only updated." +
f"Skipping sync. Enable integration pipeline {resource['name']}",
)

self.resource_config.destination_resources[_id] = self.destination_integration_pipelines[resource["name"]]
self.update_resource(_id, resource)

def update_resource(self, _id: str, resource: Dict) -> None:
destination_client = self.config.destination_client
resource["filter"] = {}
resp = destination_client.put(
self.resource_config.base_path + f"/{self.resource_config.destination_resources[_id]['id']}",
resource,
).json()

# Normalize name for the integration pipeline
resp["name"] = resp["name"].lower()

self.resource_config.destination_resources[_id] = resp

def delete_resource(self, _id: str) -> None:
raise Exception("resource cannot be deleted.")

def connect_id(self, key: str, r_obj: Dict, resource_to_connect: str) -> Optional[List[str]]:
pass

def get_destination_integration_pipelines(self):
destination_integration_pipelines_obj = {}
destination_client = self.config.destination_client

resp = self.get_resources(destination_client)
for pipeline in resp:
if pipeline["is_read_only"]:
# Normalize name for the integration pipeline
pipeline["name"] = pipeline["name"].lower()

destination_integration_pipelines_obj[pipeline["name"]] = pipeline

return destination_integration_pipelines_obj
1 change: 1 addition & 0 deletions datadog_sync/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datadog_sync.model.synthetics_private_locations import SyntheticsPrivateLocations
from datadog_sync.model.synthetics_global_variables import SyntheticsGlobalVariables
from datadog_sync.model.logs_custom_pipelines import LogsCustomPipelines
from datadog_sync.model.logs_integration_pipelines import LogsIntegrationPipelines
from datadog_sync.model.notebooks import Notebooks
from datadog_sync.model.logs_metrics import LogsMetrics
from datadog_sync.model.host_tags import HostTags
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2023-09-06T14:16:03.759216-04:00
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2023-09-06T14:16:02.743972-04:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Content-Type:
- application/json
method: GET
uri: https://api.datadoghq.com/api/v1/logs/config/pipelines
response:
body:
string: '[{"id": "Rvk36Rj0SJizOBX_N24BSQ", "type": "pipeline", "name": "Test
pipeline", "is_enabled": true, "is_read_only": false, "filter": {"query":
"test:query"}, "processors": []}, {"id": "0i9e0i3ET6WPZV1b9ZeoPg", "type":
"pipeline", "name": "Test Pipeline - Complex", "is_enabled": true, "is_read_only":
false, "filter": {"query": "source:foo"}, "processors": [{"name": "sample
arithmetic processor", "is_enabled": true, "expression": "(time1 - time2)*1000",
"target": "my_arithmetic", "is_replace_missing": true, "type": "arithmetic-processor"},
{"name": "sample attribute processor", "is_enabled": true, "sources": ["db.instance"],
"source_type": "tag", "target": "db", "target_type": "attribute", "target_format":
"string", "preserve_source": true, "override_on_conflict": false, "type":
"attribute-remapper"}, {"name": "sample category processor", "is_enabled":
true, "categories": [{"filter": {"query": "@severity: \".\""}, "name": "debug"},
{"filter": {"query": "@severity: \"-\""}, "name": "verbose"}], "target": "foo.severity",
"type": "category-processor"}, {"name": "sample date remapper", "is_enabled":
true, "sources": ["_timestamp", "published_date"], "type": "date-remapper"},
{"name": "sample geo ip parser", "is_enabled": true, "sources": ["network.client.ip"],
"target": "network.client.geoip", "ip_processing_behavior": "do-nothing",
"type": "geo-ip-parser"}, {"name": "sample grok parser", "is_enabled": true,
"source": "message", "samples": ["sample log 1"], "grok": {"support_rules":
"", "match_rules": "Rule %{word:my_word2} %{number:my_float2}"}, "type": "grok-parser"},
{"name": "sample lookup processor", "is_enabled": true, "source": "service_id",
"target": "service_name", "lookup_table": ["1,my service"], "default_lookup":
"unknown service", "type": "lookup-processor"}, {"name": "sample message remapper",
"is_enabled": true, "sources": ["msg"], "type": "message-remapper"}, {"type":
"pipeline", "name": "nested pipeline", "is_enabled": true, "filter": {"query":
"source:foo"}, "processors": [{"name": "sample url parser", "is_enabled":
false, "sources": ["url", "extra"], "target": "http_url", "normalize_ending_slashes":
true, "type": "url-parser"}]}, {"name": "sample service remapper", "is_enabled":
true, "sources": ["service"], "type": "service-remapper"}, {"name": "sample
status remapper", "is_enabled": true, "sources": ["info", "trace"], "type":
"status-remapper"}, {"name": "sample string builder processor", "is_enabled":
true, "template": "%{user.name} logged in at %{timestamp}", "target": "user_activity",
"is_replace_missing": false, "type": "string-builder-processor"}, {"name":
"sample trace id remapper", "is_enabled": true, "sources": ["dd.trace_id"],
"type": "trace-id-remapper"}, {"name": "sample user agent parser", "is_enabled":
true, "sources": ["user", "agent"], "target": "http_agent", "is_encoded":
false, "type": "user-agent-parser"}]}, {"id": "INocz-I8T6247bE8XOKGcg", "type":
"pipeline", "name": "Sinatra", "is_enabled": true, "is_read_only": true, "filter":
{"query": "source:sinatra"}, "processors": [{"name": "Parsing Sinatra logs",
"is_enabled": true, "source": "message", "samples": ["127.0.0.1 - - [15/Jul/2018:17:41:40
+0000] \"GET /uptime_status HTTP/1.1\" 200 34 0.0004", "127.0.0.1 - - [15/Jul/2018
23:40:31] \"GET /uptime_status HTTP/1.1\" 200 6997 1.8096"], "grok": {"support_rules":
"_auth %{notSpace:http.auth:nullIf(\"-\")}\n_bytes_written %{integer:network.bytes_written}\n_client_ip
%{ipOrHost:network.client.ip}\n_version HTTP\\/%{regex(\"\\\\d+\\\\.\\\\d+\"):http.version}\n_url
%{notSpace:http.url}\n_ident %{notSpace:http.ident:nullIf(\"-\")}\n_status_code
%{integer:http.status_code}\n_method %{word:http.method}\n_date_access (%{date(\"dd/MMM/yyyy:HH:mm:ss
Z\"):date_access}|%{date(\"dd/MMM/yyyy HH:mm:ss\"):date_access})\n_duration
%{number:duration:scale(1000000000)}\n", "match_rules": "access.common %{_client_ip}
%{_ident} %{_auth} \\[%{_date_access}\\] \"(?>%{_method} |)%{_url}(?> %{_version}|)\"
%{_status_code} (?>%{_bytes_written}|-)( %{_duration}|-)?\n"}, "type": "grok-parser"},
{"name": "", "is_enabled": true, "sources": ["http.url"], "target": "http.url_details",
"normalize_ending_slashes": false, "type": "url-parser"}, {"name": "Define
`date_access` as the official date of the log", "is_enabled": true, "sources":
["date_access"], "type": "date-remapper"}, {"name": "Categorise status code",
"is_enabled": true, "categories": [{"filter": {"query": "@http.status_code:[200
TO 299]"}, "name": "OK"}, {"filter": {"query": "@http.status_code:[300 TO
399]"}, "name": "notice"}, {"filter": {"query": "@http.status_code:[400 TO
499]"}, "name": "warning"}, {"filter": {"query": "@http.status_code:[500 TO
599]"}, "name": "error"}], "target": "http.status_category", "type": "category-processor"},
{"name": "Define `http.status_category` as the official status of the log",
"is_enabled": true, "sources": ["http.status_category"], "type": "status-remapper"}]},
{"id": "_kLFOihfR-Sg81CWzLRiww", "type": "pipeline", "name": "Nginx", "is_enabled":
true, "is_read_only": true, "filter": {"query": "source:nginx"}, "processors":
[{"name": "Parsing Nginx logs", "is_enabled": true, "source": "message", "samples":
["127.0.0.1 - frank [13/Jul/2016:10:55:36 +0000] \"GET /apache_pb.gif HTTP/1.0\"
200 2326", "172.17.0.1 - - [06/Jan/2017:16:16:37 +0000] \"GET /datadoghq/company?test=var1%20Pl
HTTP/1.1\" 200 612 \"http://www.perdu.com/\" \"Mozilla/5.0 (X11; Linux x86_64)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36\"
\"-\"", "2017/09/26 14:36:50 [error] 8409#8409: *317058 \"/usr/share/nginx/html/sql/sql-admin/index.html\"
is not found (2: No such file or directory), client: 217.92.148.44, server:
localhost, request: \"HEAD http://174.138.82.103:80/sql/sql-admin/ HTTP/1.1\",
host: \"174.138.82.103\""], "grok": {"support_rules": "_auth %{notSpace:http.auth:nullIf(\"-\")}\n_bytes_written
%{integer:network.bytes_written}\n_client_ip %{ipOrHost:network.client.ip}\n_version
HTTP\\/%{regex(\"\\\\d+\\\\.\\\\d+\"):http.version}\n_url %{notSpace:http.url}\n_ident
%{notSpace:http.ident:nullIf(\"-\")}\n_user_agent %{regex(\"[^\\\\\\\"]*\"):http.useragent}\n_referer
%{notSpace:http.referer}\n_status_code %{integer:http.status_code}\n_method
%{word:http.method}\n_date_access %{date(\"dd/MMM/yyyy:HH:mm:ss Z\"):date_access}\n_x_forwarded_for
%{regex(\"[^\\\\\\\"]*\"):http._x_forwarded_for:nullIf(\"-\")}\n", "match_rules":
"access.common %{_client_ip} %{_ident} %{_auth} \\[%{_date_access}\\] \"(?>%{_method}
|)%{_url}(?> %{_version}|)\" %{_status_code} (?>%{_bytes_written}|-)\n\naccess.combined
%{access.common} (%{number:duration:scale(1000000000)} )?\"%{_referer}\" \"%{_user_agent}\"(
\"%{_x_forwarded_for}\")?.*\n\nerror.format %{date(\"yyyy/MM/dd HH:mm:ss\"):date_access}
\\[%{word:level}\\] %{data:error.message}(, %{data::keyvalue(\": \",\",\")})?\n"},
"type": "grok-parser"}, {"name": "Map `client` to `network.client.ip`", "is_enabled":
true, "sources": ["client"], "source_type": "attribute", "target": "network.client.ip",
"target_type": "attribute", "preserve_source": false, "override_on_conflict":
false, "type": "attribute-remapper"}, {"name": "Parsing Nginx Error log requests",
"is_enabled": true, "source": "request", "samples": ["HEAD http://174.138.82.103:80/sql/sql-admin/
HTTP/1.1"], "grok": {"support_rules": "_method %{word:http.method}\n_url %{notSpace:http.url}\n_version
HTTP\\/%{regex(\"\\\\d+\\\\.\\\\d+\"):http.version}\n", "match_rules": "request_parsing
(?>%{_method} |)%{_url}(?> %{_version}|)\n"}, "type": "grok-parser"}, {"name":
"", "is_enabled": true, "sources": ["http.url"], "target": "http.url_details",
"normalize_ending_slashes": false, "type": "url-parser"}, {"name": "", "is_enabled":
true, "sources": ["http.useragent"], "target": "http.useragent_details", "is_encoded":
false, "type": "user-agent-parser"}, {"name": "Define `date_access` as the
official date of the log", "is_enabled": true, "sources": ["date_access"],
"type": "date-remapper"}, {"name": "Categorise status code", "is_enabled":
true, "categories": [{"filter": {"query": "@http.status_code:[200 TO 299]"},
"name": "OK"}, {"filter": {"query": "@http.status_code:[300 TO 399]"}, "name":
"notice"}, {"filter": {"query": "@http.status_code:[400 TO 499]"}, "name":
"warning"}, {"filter": {"query": "@http.status_code:[500 TO 599]"}, "name":
"error"}], "target": "http.status_category", "type": "category-processor"},
{"name": "Define `http.status_category`, `level` as the official status of
the log", "is_enabled": true, "sources": ["http.status_category", "level"],
"type": "status-remapper"}]}]'
headers:
Content-Type:
- application/json
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2023-09-06T14:16:02.923497-04:00
Loading
Loading