推拉
总览
推拉式并非是最简单的集成方法,但它是最可靠的方法,这就是为什么我们推荐实施这种方法,尤其是当您处理大量数据时。
推拉式是一种异步集成方法。这意味着,提交作业后,我们将迅速返回一个包含作业信息(所有提交的作业参数和作业 ID,以及用于下载结果和检查作业状态的 URL)的 JSON。通过这种集成方法,作业提交过程完全独立于下载结果。
在我们处理完您的作业之后,如果您在提交作业时提供了一个 回调 URL,我们将 POST
一个包含更新作业信息的 JSON 有效载荷(包括作业的 status
设置为 done
)到您的服务器。此时,您可以继续从我们的系统中下载结果。我们将结果保留在完成后至少 24 小时 内可供检索。
通过推拉式,您可以将您的结果直接上传到您的云存储 (AWS S3 或 Google 云存储)。
注意:如果您不想麻烦地设置一个接受传入回调通知的服务,则可以尝试每隔几秒钟就得到您的结果(这个概念叫做 轮询).
您也可以尝试通过 Postman 了解推拉式的工作原理。
单一作业
描述
下面这个端点只接受一个 query
或 url
值。
端点
POST https://data.oxylabs.io/v1/queries
输入
您必须以 JSON 有效载荷发送您的作业参数,如以下代码示例所示:
curl --user user:pass1 \
'https://data.oxylabs.io/v1/queries' \
-H "Content-Type: application/json" \
-d '{"source": "ENTER_SOURCE_HERE", "url": "https://www.example.com", "geo_location": "United States", "callback_url": "https://your.callback.url", "storage_type": "s3", "storage_url": "s3://your.storage.bucket.url"}'
import requests
from pprint import pprint
# Structure payload.
payload = {
"source": "ENTER_SOURCE_HERE", # Source you choose e.g. "universal"
"url": "https://www.example.com", # Check speficic source if you should use "url" or "query"
"geo_location": "United States", # Some sources accept zip-code or cooprdinates
#"render" : "html", # Uncomment you want to render JavaScript within the page
#"parse" : true, # Check what sources support parsed data
#"callback_url": "https://your.callback.url", #required if using callback listener
"callback_url": "https://your.callback.url",
"storage_type": "s3",
"storage_url": "s3://your.storage.bucket.url"
}
# Get response.
response = requests.request(
'POST',
'https://data.oxylabs.io/v1/queries',
auth=('YOUR_USERNAME', 'YOUR_PASSWORD'), #Your credentials go here
json=payload,
)
# Print prettified response to stdout.
pprint(response.json())
<?php
$params = array(
'source' => 'ENTER_SOURCE_HERE', //Source you choose e.g. "universal"
'url' => 'https://www.example.com', // Check speficic source if you should use "url" or "query"
'geo_location' => 'United States', //Some sources accept zip-code or cooprdinates
//'render' : 'html', // Uncomment you want to render JavaScript within the page
//'parse' : TRUE, // Check what sources support parsed data
//'callback_url' => 'https://your.callback.url', //required if using callback listener
'callback_url': 'https://your.callback.url',
'storage_type' => 's3',
'storage_url' => 's3://your.storage.bucket.url'
);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "https://data.oxylabs.io/v1/queries");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($params));
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_USERPWD, "YOUR_USERNAME" . ":" . "YOUR_PASSWORD"); //Your credentials go here
$headers = array();
$headers[] = "Content-Type: application/json";
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
$result = curl_exec($ch);
echo $result;
if (curl_errno($ch)) {
echo 'Error:' . curl_error($ch);
}
curl_close ($ch);
?>
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Json;
using System.Threading.Tasks;
namespace OxyApi
{
class Program
{
static async Task Main()
{
const string Username = "YOUR_USERNAME";
const string Password = "YOUR_PASSWORD";
var parameters = new Dictionary<string, string>()
{
{ "source", "ENTER_SOURCE_HERE" },
{ "url", "https://example.com" },
{ "geo_location", "United States" },
{ "callback_url", "https://your.callback.url" },
};
var client = new HttpClient();
Uri baseUri = new Uri("https://data.oxylabs.io");
client.BaseAddress = baseUri;
var requestMessage = new HttpRequestMessage(HttpMethod.Post, "/v1/queries");
requestMessage.Content = JsonContent.Create(parameters);
var authenticationString = $"{Username}:{Password}";
var base64EncodedAuthenticationString = Convert.ToBase64String(System.Text.ASCIIEncoding.UTF8.GetBytes(authenticationString));
requestMessage.Headers.Add("Authorization", "Basic " + base64EncodedAuthenticationString);
var response = await client.SendAsync(requestMessage);
var contents = await response.Content.ReadAsStringAsync();
Console.WriteLine(contents);
}
}
}
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
func main() {
const Username = "YOUR_USERNAME"
const Password = "YOUR_PASSWORD"
payload := map[string]string{
"source": "ENTER_SOURCE_HERE",
"url": "https://example.com",
"geo_location": "United States",
"callback_url": "https://your.callback.url",
}
jsonValue, _ := json.Marshal(payload)
client := &http.Client{}
request, _ := http.NewRequest("POST",
"https://data.oxylabs.io/v1/queries",
bytes.NewBuffer(jsonValue),
)
request.Header.Add("Content-type", "application/json")
request.SetBasicAuth(Username, Password)
response, _ := client.Do(request)
responseText, _ := ioutil.ReadAll(response.Body)
fmt.Println(string(responseText))
}
import okhttp3.*;
import org.json.JSONObject;
public class Main implements Runnable {
private static final String AUTHORIZATION_HEADER = "Authorization";
public static final String USERNAME = "YOUR_USERNAME";
public static final String PASSWORD = "YOUR_PASSWORD";
public void run() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("source", "ENTER_SOURCE_HERE");
jsonObject.put("url", "https://example.com");
jsonObject.put("geo_location", "United States");
jsonObject.put("callback_url", "https://your.callback.url");
Authenticator authenticator = (route, response) -> {
String credential = Credentials.basic(USERNAME, PASSWORD);
return response
.request()
.newBuilder()
.header(AUTHORIZATION_HEADER, credential)
.build();
};
var client = new OkHttpClient.Builder()
.authenticator(authenticator)
.build();
var mediaType = MediaType.parse("application/json; charset=utf-8");
var body = RequestBody.create(jsonObject.toString(), mediaType);
var request = new Request.Builder()
.url("https://data.oxylabs.io/v1/queries")
.post(body)
.build();
try (var response = client.newCall(request).execute()) {
assert response.body() != null;
System.out.println(response.body().string());
} catch (Exception exception) {
System.out.println("Error: " + exception.getMessage());
}
System.exit(0);
}
public static void main(String[] args) {
new Thread(new Main()).start();
}
}
import fetch from 'node-fetch';
const username = 'YOUR_USERNAME';
const password = 'YOUR_PASSWORD';
const body = {
source: 'ENTER_SOURCE_HERE',
url: 'https://www.example.com',
geo_location: 'United States',
callback_url: 'https://your.callback.url',
};
const response = await fetch('https://data.oxylabs.io/v1/queries', {
method: 'post',
body: JSON.stringify(body),
headers: {
'Content-Type': 'application/json',
'Authorization': 'Basic ' + Buffer.from(`${username}:${password}`).toString('base64'),
}
});
console.log(await response.json());
输出
API 将响应一个包含作业信息的 JSON,具体如下:
{
"callback_url": "https://your.callback.url",
"client_id": 5,
"context": [
{
"key": "results_language",
"value": null
},
{
"key": "safe_search",
"value": null
},
{
"key": "tbm",
"value": null
},
{
"key": "cr",
"value": null
},
{
"key": "filter",
"value": null
}
],
"created_at": "2019-10-01 00:00:01",
"domain": "com",
"geo_location": "United States",
"id": "12345678900987654321",
"limit": 10,
"locale": null,
"pages": 1,
"parse": false,
"render": null,
"url": "https://www.example.com",
"source": "universal",
"start_page": 1,
"status": "pending",
"storage_type": "s3",
"storage_url": "YOUR_BUCKET_NAME/12345678900987654321.json",
"subdomain": "www",
"updated_at": "2019-10-01 00:00:01",
"user_agent_type": "desktop",
"_links": [
{
"rel": "self",
"href": "http://data.oxylabs.io/v1/queries/12345678900987654321",
"method": "GET"
},
{
"rel": "results",
"href": "http://data.oxylabs.io/v1/queries/12345678900987654321/results",
"method": "GET"
}
]
}
检查作业状态
描述
如果您在提交作业时包含一个有效的 callback_url
值。完成作业后,我们将POST
一个 JSON 有效载荷到您指定的回调 URL。JSON 有效载荷将表明作业已经完成,其状态设置为done
。
然而,如果您提交的作业没有callback_url
,则可以自己检查作业状态。要做到这一点,使用 rel
:self
中的 href
URL,取自您提交作业后收到的响应信息。
检查作业状态的 URL 看起来与此类似:http://data.oxylabs.io/v1/queries/12345678900987654321
。查询这个 URL 将返回作业信息,包括其status
。
可能的 status
值共有 3 种:
端点
GET https://data.oxylabs.io/v1/queries/{id}
输入
curl --user user:pass1 \
'http://data.oxylabs.io/v1/queries/12345678900987654321'
import requests
from pprint import pprint
# Get response from stats endpoint.
response = requests.request(
method='GET',
url='http://data.oxylabs.io/v1/queries/12345678900987654321',
auth=('user', 'pass1'),
)
# Print prettified JSON response to stdout.
pprint(response.json())
<?php
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://data.oxylabs.io/v1/queries/12345678900987654321");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "GET");
curl_setopt($ch, CURLOPT_USERPWD, "user" . ":" . "pass1");
$result = curl_exec($ch);
echo $result;
if (curl_errno($ch)) {
echo 'Error:' . curl_error($ch);
}
curl_close ($ch);
?>
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Json;
using System.Threading.Tasks;
namespace OxyApi
{
class Program
{
static async Task Main()
{
const string JobId = "12345678900987654321";
const string Username = "YOUR_USERNAME";
const string Password = "YOUR_PASSWORD";
var client = new HttpClient();
Uri baseUri = new Uri("https://data.oxylabs.io");
client.BaseAddress = baseUri;
var requestMessage = new HttpRequestMessage(HttpMethod.Get, $"/v1/queries/{JobId}");
var authenticationString = $"{Username}:{Password}";
var base64EncodedAuthenticationString = Convert.ToBase64String(System.Text.ASCIIEncoding.UTF8.GetBytes(authenticationString));
requestMessage.Headers.Add("Authorization", "Basic " + base64EncodedAuthenticationString);
var response = await client.SendAsync(requestMessage);
var contents = await response.Content.ReadAsStringAsync();
Console.WriteLine(contents);
}
}
}
package main
import (
"fmt"
"io/ioutil"
"net/http"
)
func main() {
const JobId = "12345678900987654321"
const Username = "YOUR_USERNAME"
const Password = "YOUR_PASSWORD"
client := &http.Client{}
request, _ := http.NewRequest("GET",
fmt.Sprintf("https://data.oxylabs.io/v1/queries/%s", JobId),
nil,
)
request.Header.Add("Content-type", "application/json")
request.SetBasicAuth(Username, Password)
response, _ := client.Do(request)
responseText, _ := ioutil.ReadAll(response.Body)
fmt.Println(string(responseText))
}
import okhttp3.*;
public class Main implements Runnable {
private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String JOB_ID = "12345678900987654321";
public static final String USERNAME = "YOUR_USERNAME";
public static final String PASSWORD = "YOUR_PASSWORD";
public void run() {
Authenticator authenticator = (route, response) -> {
String credential = Credentials.basic(USERNAME, PASSWORD);
return response
.request()
.newBuilder()
.header(AUTHORIZATION_HEADER, credential)
.build();
};
var client = new OkHttpClient.Builder()
.authenticator(authenticator)
.build();
var request = new Request.Builder()
.url(String.format("https://data.oxylabs.io/v1/queries/%s", JOB_ID))
.get()
.build();
try (var response = client.newCall(request).execute()) {
assert response.body() != null;
System.out.println(response.body().string());
} catch (Exception exception) {
System.out.println("Error: " + exception.getMessage());
}
System.exit(0);
}
public static void main(String[] args) {
new Thread(new Main()).start();
}
}
import fetch from 'node-fetch';
const jobId = '12345678900987654321';
const username = 'YOUR_USERNAME';
const password = 'YOUR_PASSWORD';
const response = await fetch(`https://data.oxylabs.io/v1/queries/${jobId}`, {
method: 'get',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Basic ' + Buffer.from(`${username}:${password}`).toString('base64'),
}
});
console.log(await response.json());
输出
{
"client_id": 5,
"context": [
{
"key": "results_language",
"value": null
},
{
"key": "safe_search",
"value": null
},
{
"key": "tbm",
"value": null
},
{
"key": "cr",
"value": null
},
{
"key": "filter",
"value": null
}
],
"created_at": "2019-10-01 00:00:01",
"domain": "com",
"geo_location": null,
"id": "12345678900987654321",
"limit": 10,
"locale": null,
"pages": 1,
"parse": false,
"render": null,
"query": "adidas",
"source": "google_shopping_search",
"start_page": 1,
"status": "done",
"subdomain": "www",
"updated_at": "2019-10-01 00:00:15",
"user_agent_type": "desktop",
"_links": [
{
"rel": "self",
"href": "http://data.oxylabs.io/v1/queries/12345678900987654321",
"method": "GET"
},
{
"rel": "results",
"href": "http://data.oxylabs.io/v1/queries/12345678900987654321/results",
"method": "GET"
}
]
}
API 将通过在响应主体中打印 JSON 格式响应查询信息。请注意,作业状态
已被改变为status
。您现在可以通过向 http://data.oxylabs.io/v1/queries/12345678900987654321/results
发送查询来检索内容。您还可以看到,该作业 updated_at
2019-10-01 00:00:15
- 作业花了 14 秒完成。
检索作业内容
描述
在知道可以准备检索作业后,您便可使用作业信息响应 rel
:results
中的 href
URL 来进行 GET
。结果链接看起来像这样:http://data.oxylabs.io/v1/queries/12345678900987654321/results
。
通过设置回调 服务,无需定期检查作业状态即可自动检索结果 。要做到这一点,在提交作业时指定一个能够接受传入 HTTP 请求的服务器的 URL。当我们的系统完成作业时,它将寄送
一个 JSON 有效载荷到所提供的 URL,而回调服务将下载结果,正如在回调执行示例所示。
端点
GET https://data.oxylabs.io/v1/queries/{id}/results
输入
以下代码示例展示了如何使用/results
端点。
curl --user user:pass1 \
'http://data.oxylabs.io/v1/queries/12345678900987654321/results'
import requests
from pprint import pprint
# Get response from stats endpoint.
response = requests.request(
method='GET',
url='http://data.oxylabs.io/v1/queries/12345678900987654321/results',
auth=('user', 'pass1'),
)
# Print prettified JSON response to stdout.
pprint(response.json())
<?php
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "http://data.oxylabs.io/v1/queries/12345678900987654321/results");
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "GET");
curl_setopt($ch, CURLOPT_USERPWD, "user" . ":" . "pass1");
$result = curl_exec($ch);
echo $result;
if (curl_errno($ch)) {
echo 'Error:' . curl_error($ch);
}
curl_close ($ch);
?>
using System;
using System.Net.Http;
using System.Threading.Tasks;
namespace OxyApi
{
class Program
{
static async Task Main()
{
const string JobId = "12345678900987654321";
const string Username = "YOUR_USERNAME";
const string Password = "YOUR_PASSWORD";
var client = new HttpClient();
Uri baseUri = new Uri("https://data.oxylabs.io");
client.BaseAddress = baseUri;
var requestMessage = new HttpRequestMessage(HttpMethod.Get, $"/v1/queries/{JobId}/results");
var authenticationString = $"{Username}:{Password}";
var base64EncodedAuthenticationString = Convert.ToBase64String(System.Text.ASCIIEncoding.UTF8.GetBytes(authenticationString));
requestMessage.Headers.Add("Authorization", "Basic " + base64EncodedAuthenticationString);
var response = await client.SendAsync(requestMessage);
var contents = await response.Content.ReadAsStringAsync();
Console.WriteLine(contents);
}
}
}
package main
import (
"fmt"
"io/ioutil"
"net/http"
)
func main() {
const JobId = "12345678900987654321"
const Username = "YOUR_USERNAME"
const Password = "YOUR_PASSWORD"
client := &http.Client{}
request, _ := http.NewRequest("GET",
fmt.Sprintf("https://data.oxylabs.io/v1/queries/%s/results", JobId),
nil,
)
request.Header.Add("Content-type", "application/json")
request.SetBasicAuth(Username, Password)
response, _ := client.Do(request)
responseText, _ := ioutil.ReadAll(response.Body)
fmt.Println(string(responseText))
}
import okhttp3.*;
public class Main implements Runnable {
private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String JOB_ID = "12345678900987654321";
public static final String USERNAME = "YOUR_USERNAME";
public static final String PASSWORD = "YOUR_PASSWORD";
public void run() {
Authenticator authenticator = (route, response) -> {
String credential = Credentials.basic(USERNAME, PASSWORD);
return response
.request()
.newBuilder()
.header(AUTHORIZATION_HEADER, credential)
.build();
};
var client = new OkHttpClient.Builder()
.authenticator(authenticator)
.build();
var request = new Request.Builder()
.url(String.format("https://data.oxylabs.io/v1/queries/%s/results", JOB_ID))
.get()
.build();
try (var response = client.newCall(request).execute()) {
assert response.body() != null;
System.out.println(response.body().string());
} catch (Exception exception) {
System.out.println("Error: " + exception.getMessage());
}
System.exit(0);
}
public static void main(String[] args) {
new Thread(new Main()).start();
}
}
import fetch from 'node-fetch';
const jobId = '12345678900987654321';
const username = 'YOUR_USERNAME';
const password = 'YOUR_PASSWORD';
const response = await fetch(`https://data.oxylabs.io/v1/queries/${jobId}/results`, {
method: 'get',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Basic ' + Buffer.from(`${username}:${password}`).toString('base64'),
}
});
console.log(await response.json());
输出
下面的 JSON 文件包含了一个 /results
端点的响应示例:
{
"results": [
{
"content": "<!doctype html><html>
CONTENT
</html>",
"created_at": "2019-10-01 00:00:01",
"updated_at": "2019-10-01 00:00:15",
"page": 1,
"url": "https://www.google.com/search?q=adidas&hl=en&gl=US",
"job_id": "12345678900987654321",
"status_code": 200
}
]
}
回调
该回调是一个我们向您的机器发送的 POST
请求,通知您数据提取任务已经完成,并提供一个 URL 来下载抓取的内容。这意味着,您不再需要手动检查作业状态 。一旦数据到手,我们会通知您,您现在需要做的就是进行检索 。请查看 Python 和 PHP 的代码样本。
# This is a simple Sanic web server with a route listening for callbacks on localhost:8080.
# It will print job results to stdout.
import requests
from pprint import pprint
from sanic import Sanic, response
AUTH_TUPLE = ('user', 'pass1')
app = Sanic()
# Define /job_listener endpoint that accepts POST requests.
@app.route('/job_listener', methods=['POST'])
async def job_listener(request):
try:
res = request.json
links = res.get('_links', [])
for link in links:
if link['rel'] == 'results':
# Sanic is async, but requests are synchronous, to fully take
# advantage of Sanic, use aiohttp.
res_response = requests.request(
method='GET',
url=link['href'],
auth=AUTH_TUPLE,
)
pprint(res_response.json())
break
except Exception as e:
print("Listener exception: {}".format(e))
return response.json(status=200, body={'status': 'ok'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
<?php
$stdout = fopen('php://stdout', 'w');
if (isset($_POST)) {
$result = array_merge($_POST, (array) json_decode(file_get_contents('php://input')));
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, "https://data.oxylabs.io/v1/queries/".$result['id'].'/results');
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "GET");
curl_setopt($ch, CURLOPT_USERPWD, "user" . ":" . "pass1");
$result = curl_exec($ch);
fwrite($stdout, $result);
if (curl_errno($ch)) {
echo 'Error:' . curl_error($ch);
}
curl_close ($ch);
}
?>
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Net.Http;
namespace OxyApiWeb
{
public class Callback
{
public Link[] _links { get; set; }
}
public class Link
{
public string rel { get; set; }
public string href { get; set; }
}
public class Startup
{
private const string USERNAME = "YOUR_USERNAME";
private const string PASSWORD = "YOUR_PASSWORD";
public Startup(IConfiguration configuration)
{
Configuration = configuration;
client = new HttpClient();
}
public IConfiguration Configuration { get; }
private HttpClient client;
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{