(sources)
Creates a source given a name, workspace id, and a json blob containing the configuration for the source.
import airbyte_api
import dateutil.parser
from airbyte_api import models
s = airbyte_api.AirbyteAPI(
security=models.Security(
basic_auth=models.SchemeBasicAuth(
password="",
username="",
),
),
)
res = s.sources.create_source(request=models.SourceCreateRequest(
configuration=models.SourcePinterest(
custom_reports=[
models.ReportConfig(
columns=[
models.SourcePinterestSchemasValidEnums.TOTAL_REPIN_RATE,
],
name='<value>',
start_date=dateutil.parser.parse('2022-07-28').date(),
),
models.ReportConfig(
columns=[
models.SourcePinterestSchemasValidEnums.TOTAL_VIEW_LEAD,
],
name='<value>',
start_date=dateutil.parser.parse('2022-07-28').date(),
),
models.ReportConfig(
columns=[
models.SourcePinterestSchemasValidEnums.TOTAL_WEB_ENGAGEMENT_CHECKOUT,
models.SourcePinterestSchemasValidEnums.TOTAL_VIEW_LEAD,
models.SourcePinterestSchemasValidEnums.TOTAL_ENGAGEMENT_CHECKOUT,
],
name='<value>',
start_date=dateutil.parser.parse('2022-07-28').date(),
),
],
start_date=dateutil.parser.parse('2022-07-28').date(),
),
name='My Source',
workspace_id='744cc0ed-7f05-4949-9e60-2a814f90c035',
))
if res.source_response is not None:
# handle response
pass
api.CreateSourceResponse
Error Object |
Status Code |
Content Type |
errors.SDKError |
4xx-5xx |
/ |
Delete a Source
import airbyte_api
from airbyte_api import api, models
s = airbyte_api.AirbyteAPI(
security=models.Security(
basic_auth=models.SchemeBasicAuth(
password="",
username="",
),
),
)
res = s.sources.delete_source(request=api.DeleteSourceRequest(
source_id='<value>',
))
if res is not None:
# handle response
pass
api.DeleteSourceResponse
Error Object |
Status Code |
Content Type |
errors.SDKError |
4xx-5xx |
/ |
Get Source details
import airbyte_api
from airbyte_api import api, models
s = airbyte_api.AirbyteAPI(
security=models.Security(
basic_auth=models.SchemeBasicAuth(
password="",
username="",
),
),
)
res = s.sources.get_source(request=api.GetSourceRequest(
source_id='<value>',
))
if res.source_response is not None:
# handle response
pass
Parameter |
Type |
Required |
Description |
request |
api.GetSourceRequest |
✔️ |
The request object to use for the request. |
api.GetSourceResponse
Error Object |
Status Code |
Content Type |
errors.SDKError |
4xx-5xx |
/ |
Given a source ID, workspace ID, and redirect URL, initiates OAuth for the source.
This returns a fully formed URL for performing user authentication against the relevant source identity provider (IdP). Once authentication has been completed, the IdP will redirect to an Airbyte endpoint which will save the access and refresh tokens off as a secret and return the secret ID to the redirect URL specified in the secret_id
query string parameter.
That secret ID can be used to create a source with credentials in place of actual tokens.
import airbyte_api
from airbyte_api import models
s = airbyte_api.AirbyteAPI(
security=models.Security(
basic_auth=models.SchemeBasicAuth(
password="",
username="",
),
),
)
res = s.sources.initiate_o_auth(request=models.InitiateOauthRequest(
redirect_url='https://cloud.airbyte.io/v1/api/oauth/callback',
source_type=models.OAuthActorNames.GITLAB,
workspace_id='871d9b60-11d1-44cb-8c92-c246d53bf87e',
))
if res is not None:
# handle response
pass
api.InitiateOAuthResponse
Error Object |
Status Code |
Content Type |
errors.SDKError |
4xx-5xx |
/ |
List sources
import airbyte_api
from airbyte_api import api, models
s = airbyte_api.AirbyteAPI(
security=models.Security(
basic_auth=models.SchemeBasicAuth(
password="",
username="",
),
),
)
res = s.sources.list_sources(request=api.ListSourcesRequest(
workspace_ids=[
'df08f6b0-b364-4cc1-9b3f-96f5d2fccfb2,b0796797-de23-4fc7-a5e2-7e131314718c',
],
))
if res.sources_response is not None:
# handle response
pass
Parameter |
Type |
Required |
Description |
request |
api.ListSourcesRequest |
✔️ |
The request object to use for the request. |
api.ListSourcesResponse
Error Object |
Status Code |
Content Type |
errors.SDKError |
4xx-5xx |
/ |
Update a Source
import airbyte_api
from airbyte_api import api, models
s = airbyte_api.AirbyteAPI(
security=models.Security(
basic_auth=models.SchemeBasicAuth(
password="",
username="",
),
),
)
res = s.sources.patch_source(request=api.PatchSourceRequest(
source_id='<value>',
source_patch_request=models.SourcePatchRequest(
configuration=models.SourceDremio(
api_key='<value>',
),
name='My Source',
workspace_id='744cc0ed-7f05-4949-9e60-2a814f90c035',
),
))
if res.source_response is not None:
# handle response
pass
Parameter |
Type |
Required |
Description |
request |
api.PatchSourceRequest |
✔️ |
The request object to use for the request. |
api.PatchSourceResponse
Error Object |
Status Code |
Content Type |
errors.SDKError |
4xx-5xx |
/ |
Update a Source and fully overwrite it
import airbyte_api
import dateutil.parser
from airbyte_api import api, models
s = airbyte_api.AirbyteAPI(
security=models.Security(
basic_auth=models.SchemeBasicAuth(
password="",
username="",
),
),
)
res = s.sources.put_source(request=api.PutSourceRequest(
source_id='<value>',
source_put_request=models.SourcePutRequest(
configuration=models.SourceGoogleTasks(
api_key='<value>',
start_date=dateutil.parser.isoparse('2024-10-11T13:59:33.977Z'),
),
name='My Source',
),
))
if res.source_response is not None:
# handle response
pass
Parameter |
Type |
Required |
Description |
request |
api.PutSourceRequest |
✔️ |
The request object to use for the request. |
api.PutSourceResponse
Error Object |
Status Code |
Content Type |
errors.SDKError |
4xx-5xx |
/ |