I’ll provide a complete solution covering token refresh implementation, scheduler integration, and API error handling.
Token Refresh Implementation:
Create a reusable token manager module that handles automatic token refresh. Here’s a Python implementation you can use in your Jenkins jobs:
# gpsf_token_manager.py
import requests
import time
import json
from datetime import datetime, timedelta
class GPSFTokenManager:
def __init__(self, client_id, client_secret, token_url, cache_file='/tmp/gpsf_token_cache.json'):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.cache_file = cache_file
self.token_data = self._load_token_cache()
def get_valid_token(self):
# Check if token exists and is still valid (with 5-min buffer)
if self.token_data and self._is_token_valid():
return self.token_data['access_token']
# Token expired or missing - request new one
return self._request_new_token()
def _is_token_valid(self):
if 'expires_at' not in self.token_data:
return False
# Add 300-second buffer to refresh before actual expiry
return datetime.now() < (datetime.fromisoformat(self.token_data['expires_at']) - timedelta(seconds=300))
This approach checks token validity before each API call and automatically refreshes when needed. The 5-minute buffer ensures you never make an API call with an about-to-expire token.
Scheduler Integration:
For Jenkins integration, create a shared library that your scheduling jobs can import:
// vars/gpsfApiClient.groovy
def call(String endpoint, Map params = [:]) {
def tokenManager = new GPSFTokenManager(
env.GPSF_CLIENT_ID,
env.GPSF_CLIENT_SECRET,
env.GPSF_TOKEN_URL
)
def token = tokenManager.getValidToken()
// Make API call with automatic retry logic
def response = httpRequest(
url: "${env.GPSF_API_BASE_URL}${endpoint}",
httpMode: 'POST',
customHeaders: [[name: 'Authorization', value: "Bearer ${token}"]],
validResponseCodes: '200:299',
timeout: 30
)
return response
}
Then your production scheduling job becomes much simpler:
// ProductionOptimizationJob.groovy
pipeline {
agent any
triggers {
cron('0 */4 * * *') // Every 4 hours
}
stages {
stage('Optimize Production Schedule') {
steps {
script {
try {
// Health check first
def health = gpsfApiClient('/api/v1/health')
// Get current schedule
def schedule = gpsfApiClient('/api/v1/production/schedule/current')
// Optimize and update
def optimized = optimizeSchedule(schedule)
def result = gpsfApiClient('/api/v1/production/schedule/update', optimized)
echo "Schedule updated successfully: ${result.scheduleId}"
} catch (Exception e) {
// Handle errors with notification
handleApiError(e)
}
}
}
}
}
}
API Error Handling:
Implement comprehensive error handling that distinguishes between different failure scenarios:
# api_error_handler.py
import time
import logging
class GPSFApiClient:
def __init__(self, token_manager):
self.token_manager = token_manager
self.max_retries = 3
self.base_delay = 2 # seconds
def call_api(self, endpoint, method='GET', data=None):
for attempt in range(self.max_retries):
try:
token = self.token_manager.get_valid_token()
response = requests.request(
method=method,
url=f"{self.base_url}{endpoint}",
headers={"Authorization": f"Bearer {token}"},
json=data,
timeout=30
)
if response.status_code == 401:
# Token might be invalid - force refresh
self.token_manager.invalidate_cache()
if attempt < self.max_retries - 1:
continue
raise AuthenticationError("Token refresh failed")
elif response.status_code in [500, 502, 503, 504]:
# Server error - retry with exponential backoff
if attempt < self.max_retries - 1:
delay = self.base_delay * (2 ** attempt)
logging.warning(f"Server error {response.status_code}, retrying in {delay}s")
time.sleep(delay)
continue
raise ServerError(f"API server error: {response.status_code}")
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
if attempt < self.max_retries - 1:
logging.warning(f"Request timeout, retry {attempt + 1}/{self.max_retries}")
continue
raise TimeoutError("API request timed out after retries")
raise MaxRetriesExceeded("Failed after maximum retry attempts")
Key error handling features:
- 401 Unauthorized: Force token refresh and retry
- 5xx Server Errors: Exponential backoff retry (2s, 4s, 8s)
- Timeout: Retry with same delay
- Other errors: Fail fast with clear error message
Health Check Implementation:
GPSF 2023.1 provides a health endpoint at /api/v1/health. Add a pre-flight check:
def check_api_health(self):
try:
response = self.call_api('/api/v1/health', method='GET')
if response.get('status') != 'healthy':
raise HealthCheckFailed(f"API unhealthy: {response.get('message')}")
return True
except Exception as e:
logging.error(f"Health check failed: {str(e)}")
return False
def run_production_optimization(self):
# Check health before expensive operations
if not self.check_api_health():
logging.warning("Skipping job - API health check failed")
return
# Proceed with optimization...
Complete Integration Example:
Here’s how it all works together in your Jenkins job:
# production_scheduler_job.py
from gpsf_token_manager import GPSFTokenManager
from api_error_handler import GPSFApiClient
import logging
def main():
# Initialize token manager
token_mgr = GPSFTokenManager(
client_id=os.getenv('GPSF_CLIENT_ID'),
client_secret=os.getenv('GPSF_CLIENT_SECRET'),
token_url='https://gpsf.company.com/oauth/token'
)
# Initialize API client with error handling
api_client = GPSFApiClient(token_mgr)
api_client.base_url = 'https://gpsf.company.com'
try:
# Health check
if not api_client.check_api_health():
send_alert("GPSF API health check failed - job skipped")
return
# Get current schedule (token auto-refreshed if needed)
schedule = api_client.call_api('/api/v1/production/schedule/current')
# Optimize schedule
optimized = optimize_schedule(schedule)
# Update schedule
result = api_client.call_api(
'/api/v1/production/schedule/update',
method='POST',
data=optimized
)
logging.info(f"Schedule updated: {result['scheduleId']}")
except AuthenticationError as e:
send_alert(f"Authentication failed: {e}")
except ServerError as e:
send_alert(f"Server error: {e}")
except Exception as e:
send_alert(f"Unexpected error: {e}")
if __name__ == '__main__':
main()
Key Benefits:
- Automatic token refresh - Tokens refreshed 5 minutes before expiry, transparent to your jobs
- Persistent caching - Token survives job restarts, reduces token requests
- Exponential backoff - Graceful handling of temporary API outages
- Health checks - Prevents partial updates when API is degraded
- Clear error handling - Distinguishes auth failures from server errors for better alerting
Implement this pattern and your scheduled jobs will run reliably without manual token management or silent failures. The token manager handles all the complexity of OAuth token lifecycle, while the error handler ensures robust operation even when the API has issues.